blob: e2315ec9780490705f245b8ed17c6a4799efaaf1 [file] [log] [blame]
<!doctype html>
<html lang="zh-CN" dir="ltr" class="docs-wrapper docs-doc-page docs-version-1.11.0 plugin-docs plugin-id-default docs-doc-id-data_node/load_node/kafka">
<head>
<meta charset="UTF-8">
<meta name="generator" content="Docusaurus v2.3.1">
<title data-rh="true">Kafka | Apache InLong</title><meta data-rh="true" name="viewport" content="width=device-width,initial-scale=1"><meta data-rh="true" name="twitter:card" content="summary_large_image"><meta data-rh="true" property="og:url" content="https://inlong.apache.org/zh-CN/docs/data_node/load_node/kafka"><meta data-rh="true" name="docusaurus_locale" content="zh-CN"><meta data-rh="true" name="docsearch:language" content="zh-CN"><meta data-rh="true" name="docusaurus_version" content="1.11.0"><meta data-rh="true" name="docusaurus_tag" content="docs-default-1.11.0"><meta data-rh="true" name="docsearch:version" content="1.11.0"><meta data-rh="true" name="docsearch:docusaurus_tag" content="docs-default-1.11.0"><meta data-rh="true" property="og:title" content="Kafka | Apache InLong"><meta data-rh="true" name="description" content="概览"><meta data-rh="true" property="og:description" content="概览"><link data-rh="true" rel="icon" href="/zh-CN/img/logo.svg"><link data-rh="true" rel="canonical" href="https://inlong.apache.org/zh-CN/docs/data_node/load_node/kafka"><link data-rh="true" rel="alternate" href="https://inlong.apache.org/docs/data_node/load_node/kafka" hreflang="en"><link data-rh="true" rel="alternate" href="https://inlong.apache.org/zh-CN/docs/data_node/load_node/kafka" hreflang="zh-CN"><link data-rh="true" rel="alternate" href="https://inlong.apache.org/docs/data_node/load_node/kafka" hreflang="x-default"><link data-rh="true" rel="preconnect" href="https://YUW9QEL53E-dsn.algolia.net" crossorigin="anonymous"><link rel="alternate" type="application/rss+xml" href="/zh-CN/blog/rss.xml" title="Apache InLong RSS Feed">
<link rel="alternate" type="application/atom+xml" href="/zh-CN/blog/atom.xml" title="Apache InLong Atom Feed">
<link rel="search" type="application/opensearchdescription+xml" title="Apache InLong" href="/zh-CN/opensearch.xml">
<script src="https://www.apachecon.com/event-images/snippet.js" async></script><link rel="stylesheet" href="/zh-CN/assets/css/styles.09deabdb.css">
<link rel="preload" href="/zh-CN/assets/js/runtime~main.933f32cb.js" as="script">
<link rel="preload" href="/zh-CN/assets/js/main.32851df5.js" as="script">
</head>
<body class="navigation-with-keyboard">
<script>!function(){function e(e){document.documentElement.setAttribute("data-theme",e)}var t=function(){var e=null;try{e=localStorage.getItem("theme")}catch(e){}return e}();null!==t?e(t):window.matchMedia("(prefers-color-scheme: dark)").matches?e("dark"):(window.matchMedia("(prefers-color-scheme: light)").matches,e("light"))}()</script><div id="__docusaurus">
<div role="region" aria-label="Skip to main content"><a class="skipToContent_fXgn" href="#docusaurus_skipToContent_fallback">Skip to main content</a></div><nav aria-label="主导航" class="navbar navbar--fixed-top"><div class="navbar__inner"><div class="navbar__items"><button aria-label="切换导航栏" aria-expanded="false" class="navbar__toggle clean-btn" type="button"><svg width="30" height="30" viewBox="0 0 30 30" aria-hidden="true"><path stroke="currentColor" stroke-linecap="round" stroke-miterlimit="10" stroke-width="2" d="M4 7h22M4 15h22M4 23h22"></path></svg></button><a class="navbar__brand" href="/zh-CN/"><div class="navbar__logo"><img src="/zh-CN/img/logo.svg" alt="Apache" class="themedImage_ToTc themedImage--light_HNdA"><img src="/zh-CN/img/logo.svg" alt="Apache" class="themedImage_ToTc themedImage--dark_i4oU"></div><b class="navbar__title text--truncate">Apache InLong</b></a></div><div class="navbar__items navbar__items--right"><div class="navbar__item dropdown dropdown--hoverable dropdown--right"><a class="navbar__link" aria-haspopup="true" aria-expanded="false" role="button" href="/zh-CN/docs/introduction">文档</a><ul class="dropdown__menu"><li><a class="dropdown__link" href="/zh-CN/docs/next/introduction">Next</a></li><li><a class="dropdown__link" href="/zh-CN/docs/introduction">1.11.0</a></li><li><a class="dropdown__link" href="/zh-CN/docs/1.10.0/introduction">1.10.0</a></li><li><a class="dropdown__link" href="/zh-CN/docs/1.9.0/introduction">1.9.0</a></li><li><a class="dropdown__link" href="/zh-CN/docs/1.8.0/introduction">1.8.0</a></li><li><a class="dropdown__link" href="/zh-CN/versions/">All versions</a></li></ul></div><a class="navbar__item navbar__link" href="/zh-CN/downloads">下载</a><a class="navbar__item navbar__link" href="/zh-CN/community/how-to-contribute">社区</a><a class="navbar__item navbar__link" href="/zh-CN/blog">博客</a><a class="navbar__item navbar__link" href="/zh-CN/team">团队</a><div class="navbar__item dropdown dropdown--hoverable dropdown--right"><a href="#" aria-haspopup="true" aria-expanded="false" role="button" class="navbar__link">Apache</a><ul class="dropdown__menu"><li><a href="https://www.apache.org/" target="_blank" rel="noopener noreferrer" class="dropdown__link">Apache Software Foundation</a></li><li><a href="https://www.apache.org/licenses/" target="_blank" rel="noopener noreferrer" class="dropdown__link">证书</a></li><li><a href="https://www.apache.org/events/current-event" target="_blank" rel="noopener noreferrer" class="dropdown__link">事件</a></li><li><a href="https://www.apache.org/security/" target="_blank" rel="noopener noreferrer" class="dropdown__link">安全</a></li><li><a href="https://www.apache.org/foundation/sponsorship.html" target="_blank" rel="noopener noreferrer" class="dropdown__link">赞助</a></li><li><a href="https://www.apache.org/foundation/policies/privacy.html" target="_blank" rel="noopener noreferrer" class="dropdown__link">Privacy</a></li><li><a href="https://www.apache.org/foundation/thanks.html" target="_blank" rel="noopener noreferrer" class="dropdown__link">致谢</a></li></ul></div><div class="navbar__item dropdown dropdown--hoverable dropdown--right"><a href="#" aria-haspopup="true" aria-expanded="false" role="button" class="navbar__link"><svg viewBox="0 0 24 24" width="20" height="20" aria-hidden="true" class="iconLanguage_nlXk"><path fill="currentColor" d="M12.87 15.07l-2.54-2.51.03-.03c1.74-1.94 2.98-4.17 3.71-6.53H17V4h-7V2H8v2H1v1.99h11.17C11.5 7.92 10.44 9.75 9 11.35 8.07 10.32 7.3 9.19 6.69 8h-2c.73 1.63 1.73 3.17 2.98 4.56l-5.09 5.02L4 19l5-5 3.11 3.11.76-2.04zM18.5 10h-2L12 22h2l1.12-3h4.75L21 22h2l-4.5-12zm-2.62 7l1.62-4.33L19.12 17h-3.24z"></path></svg>简体中文</a><ul class="dropdown__menu"><li><a href="/docs/data_node/load_node/kafka" target="_self" rel="noopener noreferrer" class="dropdown__link" lang="en">English</a></li><li><a href="/zh-CN/docs/data_node/load_node/kafka" target="_self" rel="noopener noreferrer" class="dropdown__link dropdown__link--active" lang="zh-CN">简体中文</a></li></ul></div><a href="https://github.com/apache/inlong" target="_blank" rel="noopener noreferrer" class="navbar__item navbar__link">GitHub<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a><div class="toggle_vylO colorModeToggle_DEke"><button class="clean-btn toggleButton_gllP toggleButtonDisabled_aARS" type="button" disabled="" title="切换浅色/暗黑模式(当前为浅色模式)" aria-label="切换浅色/暗黑模式(当前为浅色模式)" aria-live="polite"><svg viewBox="0 0 24 24" width="24" height="24" class="lightToggleIcon_pyhR"><path fill="currentColor" d="M12,9c1.65,0,3,1.35,3,3s-1.35,3-3,3s-3-1.35-3-3S10.35,9,12,9 M12,7c-2.76,0-5,2.24-5,5s2.24,5,5,5s5-2.24,5-5 S14.76,7,12,7L12,7z M2,13l2,0c0.55,0,1-0.45,1-1s-0.45-1-1-1l-2,0c-0.55,0-1,0.45-1,1S1.45,13,2,13z M20,13l2,0c0.55,0,1-0.45,1-1 s-0.45-1-1-1l-2,0c-0.55,0-1,0.45-1,1S19.45,13,20,13z M11,2v2c0,0.55,0.45,1,1,1s1-0.45,1-1V2c0-0.55-0.45-1-1-1S11,1.45,11,2z M11,20v2c0,0.55,0.45,1,1,1s1-0.45,1-1v-2c0-0.55-0.45-1-1-1C11.45,19,11,19.45,11,20z M5.99,4.58c-0.39-0.39-1.03-0.39-1.41,0 c-0.39,0.39-0.39,1.03,0,1.41l1.06,1.06c0.39,0.39,1.03,0.39,1.41,0s0.39-1.03,0-1.41L5.99,4.58z M18.36,16.95 c-0.39-0.39-1.03-0.39-1.41,0c-0.39,0.39-0.39,1.03,0,1.41l1.06,1.06c0.39,0.39,1.03,0.39,1.41,0c0.39-0.39,0.39-1.03,0-1.41 L18.36,16.95z M19.42,5.99c0.39-0.39,0.39-1.03,0-1.41c-0.39-0.39-1.03-0.39-1.41,0l-1.06,1.06c-0.39,0.39-0.39,1.03,0,1.41 s1.03,0.39,1.41,0L19.42,5.99z M7.05,18.36c0.39-0.39,0.39-1.03,0-1.41c-0.39-0.39-1.03-0.39-1.41,0l-1.06,1.06 c-0.39,0.39-0.39,1.03,0,1.41s1.03,0.39,1.41,0L7.05,18.36z"></path></svg><svg viewBox="0 0 24 24" width="24" height="24" class="darkToggleIcon_wfgR"><path fill="currentColor" d="M9.37,5.51C9.19,6.15,9.1,6.82,9.1,7.5c0,4.08,3.32,7.4,7.4,7.4c0.68,0,1.35-0.09,1.99-0.27C17.45,17.19,14.93,19,12,19 c-3.86,0-7-3.14-7-7C5,9.07,6.81,6.55,9.37,5.51z M12,3c-4.97,0-9,4.03-9,9s4.03,9,9,9s9-4.03,9-9c0-0.46-0.04-0.92-0.1-1.36 c-0.98,1.37-2.58,2.26-4.4,2.26c-2.98,0-5.4-2.42-5.4-5.4c0-1.81,0.89-3.42,2.26-4.4C12.92,3.04,12.46,3,12,3L12,3z"></path></svg></button></div><div class="searchBox_ZlJk"><button type="button" class="DocSearch DocSearch-Button" aria-label="搜索"><span class="DocSearch-Button-Container"><svg width="20" height="20" class="DocSearch-Search-Icon" viewBox="0 0 20 20"><path d="M14.386 14.386l4.0877 4.0877-4.0877-4.0877c-2.9418 2.9419-7.7115 2.9419-10.6533 0-2.9419-2.9418-2.9419-7.7115 0-10.6533 2.9418-2.9419 7.7115-2.9419 10.6533 0 2.9419 2.9418 2.9419 7.7115 0 10.6533z" stroke="currentColor" fill="none" fill-rule="evenodd" stroke-linecap="round" stroke-linejoin="round"></path></svg><span class="DocSearch-Button-Placeholder">搜索</span></span><span class="DocSearch-Button-Keys"></span></button></div></div></div><div role="presentation" class="navbar-sidebar__backdrop"></div></nav><div id="docusaurus_skipToContent_fallback" class="main-wrapper mainWrapper_z2l0 docsWrapper_BCFX"><button aria-label="回到顶部" class="clean-btn theme-back-to-top-button backToTopButton_sjWU" type="button"></button><div class="docPage__5DB"><aside class="theme-doc-sidebar-container docSidebarContainer_b6E3"><div class="sidebarViewport_Xe31"><div class="sidebar_njMd"><nav aria-label="文档侧边栏" class="menu thin-scrollbar menu_SIkG"><ul class="theme-doc-sidebar-menu menu__list"><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-1 menu__list-item"><a class="menu__link" href="/zh-CN/docs/introduction">简介</a></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh-CN/docs/design_and_concept/basic_concept">设计和概念</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh-CN/docs/quick_start/how_to_build">快速开始</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh-CN/docs/deployment/standalone">安装部署</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh-CN/docs/modules/agent/overview">组件介绍</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret menu__link--active" aria-expanded="true" href="/zh-CN/docs/data_node/extract_node/overview">数据节点</a></div><ul style="display:block;overflow:visible;height:auto" class="menu__list"><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-2 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" tabindex="0" href="/zh-CN/docs/data_node/extract_node/overview">Extract Nodes</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-2 menu__list-item"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret menu__link--active" aria-expanded="true" tabindex="0" href="/zh-CN/docs/data_node/load_node/overview">Load Nodes</a></div><ul style="display:block;overflow:visible;height:auto" class="menu__list"><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/data_node/load_node/overview">总览</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/data_node/load_node/auto_consumption">自主消费</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/data_node/load_node/clickhouse">ClickHouse</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/data_node/load_node/elasticsearch">Elasticsearch</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/data_node/load_node/greenplum">Greenplum</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/data_node/load_node/hbase">HBase</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/data_node/load_node/hdfs">HDFS</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/data_node/load_node/hive">Hive</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/data_node/load_node/iceberg">Iceberg</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link menu__link--active" aria-current="page" tabindex="0" href="/zh-CN/docs/data_node/load_node/kafka">Kafka</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/data_node/load_node/mysql">MySQL</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/data_node/load_node/oracle">Oracle</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/data_node/load_node/postgresql">PostgreSQL</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/data_node/load_node/sqlserver">SQLServer</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/data_node/load_node/tdsql-postgresql">TDSQL-PostgreSQL</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/data_node/load_node/doris">Doris</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/data_node/load_node/starrocks">StarRocks</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/data_node/load_node/hudi">Hudi</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/data_node/load_node/redis">Redis</a></li></ul></li></ul></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh-CN/docs/sdk/dataproxy-sdk/cpp">SDK</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh-CN/docs/user_guide/dashboard_usage">用户指引</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh-CN/docs/development/inlong_msg">开发指引</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh-CN/docs/administration/user_management">Administration</a></div></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-1 menu__list-item"><a class="menu__link" href="/zh-CN/docs/contact">联系我们</a></li></ul></nav></div></div></aside><main class="docMainContainer_gTbr"><div class="container padding-top--md padding-bottom--lg"><div class="row"><div class="col docItemCol_VOVn"><div class="docItemContainer_Djhp"><article><nav class="theme-doc-breadcrumbs breadcrumbsContainer_Z_bl" aria-label="页面路径"><ul class="breadcrumbs" itemscope="" itemtype="https://schema.org/BreadcrumbList"><li class="breadcrumbs__item"><a aria-label="主页面" class="breadcrumbs__link" href="/zh-CN/"><svg viewBox="0 0 24 24" class="breadcrumbHomeIcon_YNFT"><path d="M10 19v-5h4v5c0 .55.45 1 1 1h3c.55 0 1-.45 1-1v-7h1.7c.46 0 .68-.57.33-.87L12.67 3.6c-.38-.34-.96-.34-1.34 0l-8.36 7.53c-.34.3-.13.87.33.87H5v7c0 .55.45 1 1 1h3c.55 0 1-.45 1-1z" fill="currentColor"></path></svg></a></li><li class="breadcrumbs__item"><span class="breadcrumbs__link">数据节点</span><meta itemprop="position" content="1"></li><li class="breadcrumbs__item"><span class="breadcrumbs__link">Load Nodes</span><meta itemprop="position" content="2"></li><li itemscope="" itemprop="itemListElement" itemtype="https://schema.org/ListItem" class="breadcrumbs__item breadcrumbs__item--active"><span class="breadcrumbs__link" itemprop="name">Kafka</span><meta itemprop="position" content="3"></li></ul></nav><span class="theme-doc-version-badge badge badge--secondary">版本:1.11.0</span><div class="tocCollapsible_ETCw theme-doc-toc-mobile tocMobile_ITEo"><button type="button" class="clean-btn tocCollapsibleButton_TO0P">On this page</button></div><div class="theme-doc-markdown markdown"><header><h1>Kafka</h1></header><h2 class="anchor anchorWithStickyNavbar_LWe7" id="概览">概览<a href="#概览" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h2><p><code>Kafka Load</code> 节点支持写数据到 Kafka topics。 它支持以普通的方式写入数据和 Upsert 的方式写入数据。 upsert-kafka 连接器可以消费 changelog 流。它会将 INSERT/UPDATE_AFTER 数据作为正常的 Kafka 消息写入,并将 DELETE 数据以 value 为空的 Kafka 消息写入(表示对应 key 的消息被删除)。</p><h2 class="anchor anchorWithStickyNavbar_LWe7" id="支持的版本">支持的版本<a href="#支持的版本" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h2><table><thead><tr><th>Load 节点</th><th>Kafka 版本</th></tr></thead><tbody><tr><td><a href="/zh-CN/docs/data_node/load_node/kafka">Kafka</a></td><td>0.10+</td></tr></tbody></table><h2 class="anchor anchorWithStickyNavbar_LWe7" id="依赖">依赖<a href="#依赖" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h2><p>为了设置 Kafka Load 节点, 下面提供了使用构建自动化工具(例如 Maven 或 SBT)和带有 Sort Connector JAR 包的 SQL 客户端的两个项目的依赖关系信息。</p><h3 class="anchor anchorWithStickyNavbar_LWe7" id="maven-依赖">Maven 依赖<a href="#maven-依赖" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h3><div class="codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-text codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">&lt;dependency&gt;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &lt;groupId&gt;org.apache.inlong&lt;/groupId&gt;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &lt;artifactId&gt;sort-connector-kafka&lt;/artifactId&gt;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &lt;version&gt;1.11.0-SNAPSHOT&lt;/version&gt;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">&lt;/dependency&gt;</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><h2 class="anchor anchorWithStickyNavbar_LWe7" id="如何创建-kafka-load-节点">如何创建 Kafka Load 节点<a href="#如何创建-kafka-load-节点" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h2><h3 class="anchor anchorWithStickyNavbar_LWe7" id="sql-api-用法">SQL API 用法<a href="#sql-api-用法" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h3><p>下面这个例子展示了如何用 <code>Flink SQL</code> 创建一个 Kafka Load 节点:</p><ul><li>连接器是 <code>kafka-inlong</code></li></ul><div class="language-sql codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-sql codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token comment" style="color:rgb(98, 114, 164)">-- 使用 Flink SQL 创建 Kafka 表 &#x27;kafka_load_node&#x27;</span><span class="token plain"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">Flink </span><span class="token keyword" style="color:rgb(189, 147, 249);font-style:italic">SQL</span><span class="token operator">&gt;</span><span class="token plain"> </span><span class="token keyword" style="color:rgb(189, 147, 249);font-style:italic">CREATE</span><span class="token plain"> </span><span class="token keyword" style="color:rgb(189, 147, 249);font-style:italic">TABLE</span><span class="token plain"> kafka_load_node </span><span class="token punctuation" style="color:rgb(248, 248, 242)">(</span><span class="token plain"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><span class="token identifier punctuation" style="color:rgb(248, 248, 242)">`</span><span class="token identifier">id</span><span class="token identifier punctuation" style="color:rgb(248, 248, 242)">`</span><span class="token plain"> </span><span class="token keyword" style="color:rgb(189, 147, 249);font-style:italic">INT</span><span class="token punctuation" style="color:rgb(248, 248, 242)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><span class="token identifier punctuation" style="color:rgb(248, 248, 242)">`</span><span class="token identifier">name</span><span class="token identifier punctuation" style="color:rgb(248, 248, 242)">`</span><span class="token plain"> STRINTG</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><span class="token punctuation" style="color:rgb(248, 248, 242)">)</span><span class="token plain"> </span><span class="token keyword" style="color:rgb(189, 147, 249);font-style:italic">WITH</span><span class="token plain"> </span><span class="token punctuation" style="color:rgb(248, 248, 242)">(</span><span class="token plain"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;connector&#x27;</span><span class="token plain"> </span><span class="token operator">=</span><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;kafka-inlong&#x27;</span><span class="token punctuation" style="color:rgb(248, 248, 242)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;topic&#x27;</span><span class="token plain"> </span><span class="token operator">=</span><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;user&#x27;</span><span class="token punctuation" style="color:rgb(248, 248, 242)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;properties.bootstrap.servers&#x27;</span><span class="token plain"> </span><span class="token operator">=</span><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;localhost:9092&#x27;</span><span class="token punctuation" style="color:rgb(248, 248, 242)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;properties.group.id&#x27;</span><span class="token plain"> </span><span class="token operator">=</span><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;testGroup&#x27;</span><span class="token punctuation" style="color:rgb(248, 248, 242)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;format&#x27;</span><span class="token plain"> </span><span class="token operator">=</span><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;csv&#x27;</span><span class="token plain"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><span class="token punctuation" style="color:rgb(248, 248, 242)">)</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><ul><li>连接器是 <code>upsert-kafka</code></li></ul><div class="language-sql codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-sql codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token comment" style="color:rgb(98, 114, 164)">-- 使用 Flink SQL 创建 Kafka 表 &#x27;kafka_load_node&#x27;</span><span class="token plain"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">Flink </span><span class="token keyword" style="color:rgb(189, 147, 249);font-style:italic">SQL</span><span class="token operator">&gt;</span><span class="token plain"> </span><span class="token keyword" style="color:rgb(189, 147, 249);font-style:italic">CREATE</span><span class="token plain"> </span><span class="token keyword" style="color:rgb(189, 147, 249);font-style:italic">TABLE</span><span class="token plain"> kafka_load_node </span><span class="token punctuation" style="color:rgb(248, 248, 242)">(</span><span class="token plain"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><span class="token identifier punctuation" style="color:rgb(248, 248, 242)">`</span><span class="token identifier">id</span><span class="token identifier punctuation" style="color:rgb(248, 248, 242)">`</span><span class="token plain"> </span><span class="token keyword" style="color:rgb(189, 147, 249);font-style:italic">INT</span><span class="token punctuation" style="color:rgb(248, 248, 242)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><span class="token identifier punctuation" style="color:rgb(248, 248, 242)">`</span><span class="token identifier">name</span><span class="token identifier punctuation" style="color:rgb(248, 248, 242)">`</span><span class="token plain"> STRINTG</span><span class="token punctuation" style="color:rgb(248, 248, 242)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><span class="token keyword" style="color:rgb(189, 147, 249);font-style:italic">PRIMARY</span><span class="token plain"> </span><span class="token keyword" style="color:rgb(189, 147, 249);font-style:italic">KEY</span><span class="token plain"> </span><span class="token punctuation" style="color:rgb(248, 248, 242)">(</span><span class="token identifier punctuation" style="color:rgb(248, 248, 242)">`</span><span class="token identifier">id</span><span class="token identifier punctuation" style="color:rgb(248, 248, 242)">`</span><span class="token punctuation" style="color:rgb(248, 248, 242)">)</span><span class="token plain"> </span><span class="token operator">NOT</span><span class="token plain"> ENFORCED</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><span class="token punctuation" style="color:rgb(248, 248, 242)">)</span><span class="token plain"> </span><span class="token keyword" style="color:rgb(189, 147, 249);font-style:italic">WITH</span><span class="token plain"> </span><span class="token punctuation" style="color:rgb(248, 248, 242)">(</span><span class="token plain"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;connector&#x27;</span><span class="token plain"> </span><span class="token operator">=</span><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;upsert-kafka-inlong&#x27;</span><span class="token punctuation" style="color:rgb(248, 248, 242)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;topic&#x27;</span><span class="token plain"> </span><span class="token operator">=</span><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;user&#x27;</span><span class="token punctuation" style="color:rgb(248, 248, 242)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;properties.bootstrap.servers&#x27;</span><span class="token plain"> </span><span class="token operator">=</span><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;localhost:9092&#x27;</span><span class="token punctuation" style="color:rgb(248, 248, 242)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;key.format&#x27;</span><span class="token plain"> </span><span class="token operator">=</span><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;csv&#x27;</span><span class="token punctuation" style="color:rgb(248, 248, 242)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;value.format&#x27;</span><span class="token plain"> </span><span class="token operator">=</span><span class="token plain"> </span><span class="token string" style="color:rgb(255, 121, 198)">&#x27;csv&#x27;</span><span class="token plain"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><span class="token punctuation" style="color:rgb(248, 248, 242)">)</span><span class="token plain"> </span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><h3 class="anchor anchorWithStickyNavbar_LWe7" id="inlong-dashboard-用法">InLong Dashboard 用法<a href="#inlong-dashboard-用法" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h3><p>创建数据流时,数据流方向选择<code>Kafka</code>,点击“添加”进行配置。</p><p><img loading="lazy" alt="Kafka Configuration" src="/zh-CN/assets/images/kafka-608e2ab7a2ba34e56c677584ad65b22e.png" width="1479" height="661" class="img_ev3q"></p><h3 class="anchor anchorWithStickyNavbar_LWe7" id="inlong-manager-client-用法">InLong Manager Client 用法<a href="#inlong-manager-client-用法" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h3><p>TODO: 将在未来支持此功能。</p><h2 class="anchor anchorWithStickyNavbar_LWe7" id="kafka-load-节点参数">Kafka Load 节点参数<a href="#kafka-load-节点参数" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h2><table><thead><tr><th>参数</th><th>是否必选</th><th>默认值</th><th>数据类型</th><th>描述</th></tr></thead><tbody><tr><td>connector</td><td>必选</td><td>(none)</td><td>String</td><td>指定要使用的连接器 1. Upsert Kafka 连接器使用: <code>upsert-kafka-inlong</code> 2. Kafka连接器使用: <code>kafka-inlong</code></td></tr><tr><td>topic</td><td>必选</td><td>(none)</td><td>String</td><td>当表用作 source 时读取数据的 topic 名。亦支持用分号间隔的 topic 列表,如 <code>topic-1;topic-2</code>。注意,对 source 表而言,<code>topic</code><code>topic-pattern</code> 两个选项只能使用其中一个。</td></tr><tr><td>topic-pattern</td><td>可选</td><td>(none)</td><td>String</td><td>动态 Topic 提取模式, 形如 &#x27;${VARIABLE_NAME}&#x27;, 仅用于 Kafka 多 Sink 场景且当 &#x27;format&#x27; 为 &#x27;raw&#x27; 时有效。</td></tr><tr><td>sink.multiple.format</td><td>可选</td><td>(none)</td><td>String</td><td>Kafka 原始数据的 Format, 目前仅支持 <!-- -->[canal-json<!-- -->|<!-- -->debezium-json]<!-- --> 仅用于 Kafka 多 Sink 场景且当 &#x27;format&#x27; 为 &#x27;raw&#x27; 时有效。</td></tr><tr><td>properties.bootstrap.servers</td><td>必选</td><td>(none)</td><td>String</td><td>逗号分隔的 Kafka broker 列表。</td></tr><tr><td>properties.*</td><td>可选</td><td>(none)</td><td>String</td><td>可以设置和传递任意 Kafka 的配置项。后缀名必须匹配在 <a href="https://kafka.apache.org/documentation/#configuration" target="_blank" rel="noopener noreferrer">Kafka 配置文档</a> 中定义的配置键。Flink 将移除 &quot;properties.&quot; 配置键前缀并将变换后的配置键和值传入底层的 Kafka 客户端。例如,你可以通过 &#x27;properties.allow.auto.create.topics&#x27; = &#x27;false&#x27; 来禁用 topic 的自动创建。但是某些配置项不支持进行配置,因为 Flink 会覆盖这些配置,例如 &#x27;key.deserializer&#x27; 和 &#x27;value.deserializer&#x27;。</td></tr><tr><td>format</td><td>对于 Kafka 必选</td><td>(none)</td><td>String</td><td>用来序列化或反序列化 Kafka 消息的格式。 请参阅 <a href="https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/overview/" target="_blank" rel="noopener noreferrer">格式</a> 页面以获取更多关于格式的细节和相关配置项。 注意:该配置项和 &#x27;value.format&#x27; 二者必需其一。</td></tr><tr><td>key.format</td><td>可选</td><td>(none)</td><td>String</td><td>用来序列化和反序列化 Kafka 消息键(Key)的格式。 请参阅 格式 页面以获取更多关于格式的细节和相关配置项。 注意:如果定义了键格式,则配置项 &#x27;key.fields&#x27; 也是必需的。 否则 Kafka 记录将使用空值作为键。</td></tr><tr><td>key.fields</td><td>可选</td><td>[]</td><td><code>List&lt;String&gt;</code></td><td>表结构中用来配置消息键(Key)格式数据类型的字段列表。默认情况下该列表为空,因此消息键没有定义。 列表格式为 &#x27;field1;field2&#x27;。</td></tr><tr><td>key.fields-prefix</td><td>可选</td><td>(none)</td><td>String</td><td>为所有消息键(Key)格式字段指定自定义前缀,以避免与消息体(Value)格式字段重名。默认情况下前缀为空。 如果定义了前缀,表结构和配置项 &#x27;key.fields&#x27; 都需要使用带前缀的名称。 当构建消息键格式字段时,前缀会被移除,消息键格式将会使用无前缀的名称。 请注意该配置项要求必须将 &#x27;value.fields-include&#x27; 配置为 &#x27;EXCEPT_KEY&#x27;。</td></tr><tr><td>value.format</td><td>必选 for upsert Kafka</td><td>(none)</td><td>String</td><td>用于对 Kafka 消息中 value 部分序列化和反序列化的格式。支持的格式包括 &#x27;csv&#x27;、&#x27;json&#x27;、&#x27;avro&#x27;。请参考<a href="https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/formats/overview/" target="_blank" rel="noopener noreferrer">格式</a> 页面以获取更多详细信息和格式参数。</td></tr><tr><td>value.fields-include</td><td>可选</td><td>ALL</td><td>Enum Possible values: <!-- -->[ALL, EXCEPT_KEY]</td><td>控制哪些字段应该出现在 value 中。可取值:<br> ALL:消息的 value 部分将包含 schema 中所有的字段,包括定义为主键的字段。<br> EXCEPT_KEY:记录的 value 部分包含 schema 的所有字段,定义为主键的字段除外。</td></tr><tr><td>sink.partitioner</td><td>可选</td><td>&#x27;default&#x27;</td><td>String</td><td>Flink partition 到 Kafka partition 的分区映射关系,可选值有:<br>default:使用 Kafka 默认的分区器对消息进行分区。<br>fixed:每个 Flink partition 最终对应最多一个 Kafka partition。<br>round-robin:Flink partition 按轮循(round-robin)的模式对应到 Kafka partition。<br>raw-hash: 基于 &#x27;sink.multiple.partition-pattern&#x27; 提取值作 &#x27;hash&#x27; 以确定最终的分区, 仅用于 Kafka 多 Sink 场景且当 &#x27;format&#x27; 为 &#x27;raw&#x27; 时有效。只有当未指定消息的消息键时生效。<br>自定义 FlinkKafkaPartitioner 的子类:例如 &#x27;org.mycompany.MyPartitioner&#x27;。请参阅 <a href="https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/kafka/#sink-%E5%88%86%E5%8C%BA" target="_blank" rel="noopener noreferrer">Sink 分区</a> 以获取更多细节。</td></tr><tr><td>sink.multiple.partition-pattern</td><td>可选</td><td>(none)</td><td>String</td><td>动态 Partition 提取模式, 形如 &#x27;${VARIABLE_NAME}&#x27;仅用于 Kafka 多 Sink 场景且当 &#x27;format&#x27; 为 &#x27;raw&#x27;、&#x27;sink.partitioner&#x27; 为 &#x27;raw-hash&#x27; 时有效。</td></tr><tr><td>sink.semantic</td><td>可选</td><td>at-least-once</td><td>String</td><td>定义 Kafka sink 的语义。有效值为 &#x27;at-least-once&#x27;,&#x27;exactly-once&#x27; 和 &#x27;none&#x27;。请参阅 <a href="https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/kafka/#%E4%B8%80%E8%87%B4%E6%80%A7%E4%BF%9D%E8%AF%81" target="_blank" rel="noopener noreferrer">一致性保证</a> 以获取更多细节。</td></tr><tr><td>sink.parallelism</td><td>可选</td><td>(none)</td><td>Integer</td><td>定义 Kafka sink 算子的并行度。默认情况下,并行度由框架定义为与上游串联的算子相同。</td></tr><tr><td>inlong.metric.labels</td><td>可选</td><td>(none)</td><td>String</td><td>inlong metric 的标签值,该值的构成为groupId=<code>{groupId}</code>&amp;streamId=<code>{streamId}</code>&amp;nodeId=<code>{nodeId}</code></td></tr></tbody></table><h2 class="anchor anchorWithStickyNavbar_LWe7" id="可用的元数据字段">可用的元数据字段<a href="#可用的元数据字段" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h2><p>支持为格式 <code>canal-json-inlong</code>写元数据。</p><p>参考 <a href="/zh-CN/docs/data_node/extract_node/kafka">Kafka Extract Node</a> 关于元数据的列表。</p><h2 class="anchor anchorWithStickyNavbar_LWe7" id="特征">特征<a href="#特征" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h2><h3 class="anchor anchorWithStickyNavbar_LWe7" id="支持动态-schema-写入">支持动态 Schema 写入<a href="#支持动态-schema-写入" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h3><p>动态 Schema 写入支持从数据中动态提取 Topic 和 Partition, 并写入到对应的 Topic
和 Partition。为了支持动态 Schema 写入,需要设置 Kafka 的 Format 格式为 &#x27;raw&#x27;,
同时需要设置上游数据的序列化格式(通过选项 &#x27;sink.multiple.format&#x27;
来设置, 目前仅支持 <!-- -->[canal-json|debezium-json]<!-- -->)。</p><h4 class="anchor anchorWithStickyNavbar_LWe7" id="动态-topic-提取">动态 Topic 提取<a href="#动态-topic-提取" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h4><p>动态 Topic 提取即通过解析 Topic Pattern 并从数据中提取 Topic 。
为了支持动态提取 Topic, 需要设置选项 &#x27;topic-pattern&#x27;, Kafka Load Node 会解析 &#x27;topic-pattern&#x27; 作为最终的 Topic,
如果解析失败, 会写入通过 &#x27;topic&#x27; 设置的默认 Topic 中。&#x27;topic-pattern&#x27; 支持常量和变量,常量就是字符串常量,
变量是严格通过 &#x27;${VARIABLE_NAME}&#x27; 来表示, 变量的取值来自于数据本身, 即可以是通过 &#x27;sink.multiple.format&#x27;
指定的某种 Format 的元数据字段, 也可以是数据中的物理字段。</p><p>关于 &#x27;topic-parttern&#x27; 的例子如下:</p><ul><li>&#x27;sink.multiple.format&#x27; 为 &#x27;canal-json&#x27;:</li></ul><p>上游数据为:</p><div class="codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-text codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">{</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;data&quot;: [</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;id&quot;: &quot;111&quot;,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;name&quot;: &quot;scooter&quot;,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;description&quot;: &quot;Big 2-wheel scooter&quot;,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;weight&quot;: &quot;5.18&quot;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> ],</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;database&quot;: &quot;inventory&quot;,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;es&quot;: 1589373560000,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;id&quot;: 9,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;isDdl&quot;: false,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;mysqlType&quot;: {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;id&quot;: &quot;INTEGER&quot;,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;name&quot;: &quot;VARCHAR(255)&quot;,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;description&quot;: &quot;VARCHAR(512)&quot;,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;weight&quot;: &quot;FLOAT&quot;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> },</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;old&quot;: [</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;weight&quot;: &quot;5.15&quot;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> ],</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;pkNames&quot;: [</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;id&quot;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> ],</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;sql&quot;: &quot;&quot;,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;sqlType&quot;: {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;id&quot;: 4,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;name&quot;: 12,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;description&quot;: 12,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;weight&quot;: 7</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> },</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;table&quot;: &quot;products&quot;,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;ts&quot;: 1589373560798,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;type&quot;: &quot;UPDATE&quot;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">} </span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>&#x27;topic-pattern&#x27; 为 &#x27;{database}_${table}&#x27;, 提取后的 Topic 为 &#x27;inventory_products&#x27; (&#x27;database&#x27;, &#x27;table&#x27; 为元数据字段)</p><p>&#x27;topic-pattern&#x27; 为 &#x27;{database}<em>${table}</em>${id}&#x27;, 提取后的 Topic 为 &#x27;inventory_products_111&#x27; (&#x27;database&#x27;, &#x27;table&#x27; 为元数据字段, &#x27;id&#x27; 为物理字段)</p><ul><li>&#x27;sink.multiple.format&#x27; 为 &#x27;debezium-json&#x27;:</li></ul><p>上游数据为:</p><div class="codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-text codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">{</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;before&quot;: {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;id&quot;: 4,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;name&quot;: &quot;scooter&quot;,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;description&quot;: &quot;Big 2-wheel scooter&quot;,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;weight&quot;: 5.18</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> },</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;after&quot;: {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;id&quot;: 4,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;name&quot;: &quot;scooter&quot;,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;description&quot;: &quot;Big 2-wheel scooter&quot;,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;weight&quot;: 5.15</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> },</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;source&quot;: {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;db&quot;: &quot;inventory&quot;,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;table&quot;: &quot;products&quot;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> },</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;op&quot;: &quot;u&quot;,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;ts_ms&quot;: 1589362330904,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> &quot;transaction&quot;: null</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">}</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>&#x27;topic-pattern&#x27; 为 &#x27;{database}_${table}&#x27;, 提取后的 Topic 为 &#x27;inventory_products&#x27; (&#x27;source.db&#x27;, &#x27;source.table&#x27; 为元数据字段)</p><p>&#x27;topic-pattern&#x27; 为 &#x27;{database}<em>${table}</em>${id}&#x27;, 提取后的 Topic 为 &#x27;inventory_products_4&#x27; (&#x27;source.db&#x27;, &#x27;source.table&#x27; 为元数据字段, &#x27;id&#x27; 为物理字段)</p><h4 class="anchor anchorWithStickyNavbar_LWe7" id="动态-partition-提取">动态 Partition 提取<a href="#动态-partition-提取" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h4><p>动态 Partition 提取即通过解析 Partition Pattern 并从数据中提取 Partition, 这和动态 Topic 提取类似。
为了支持动态提取 Topic, 需要设置选项 &#x27;sink.partitioner&#x27; 为 &#x27;raw-hash&#x27;
和选项 &#x27;sink.multiple.partition-pattern&#x27;, Kafka Load Node 会解析 &#x27;sink.multiple.partition-pattern&#x27;
作为 Partition key, 并对 Partition key 进行 Hash 和对 Partition Size 取余以确定最终 Partition,
如果解析失败, 会返回 null 并执行 Kafka 默认的分区策略。&#x27;sink.multiple.partition-pattern&#x27;
支持常量、变量和主键,常量就是字符串常量, 变量是严格通过 ${VARIABLE_NAME} 来表示, 变量的取值来自于数据本身,
即可以是通过 &#x27;sink.multiple.format&#x27; 指定的某种 Format 的元数据字段, 也可以是数据中的物理字段,
主键是一种特殊的常量 &#x27;PRIMARY_KEY&#x27;, 基于某种 Format 的数据格式下来提取该条记录的主键值。</p><p>注意: 基于 &#x27;PRIMARY_KEY&#x27; 的 Kafka 动态 Partition 提取, 有一个限制, 即需要在数据中指定主键信息,
例如 Format 为 &#x27;canal-json&#x27;, 则其主键 Key 为 &#x27;pkNames&#x27;。另外由于 Format &#x27;debezium-json&#x27; 无主键的定义, 对此
我们约定 &#x27;debezium-json&#x27; 的主键 Key 也为 &#x27;pkNames&#x27; 且和其他元数据字段如 &#x27;table&#x27;、&#x27;db&#x27; 一样包含在 &#x27;source&#x27;中,
如果用到了按主键分区, 且 Format 为 &#x27;debezium-json&#x27;, 需要确保真实数据满足上述约定。</p><h2 class="anchor anchorWithStickyNavbar_LWe7" id="数据类型映射">数据类型映射<a href="#数据类型映射" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h2><p>Kafka 将消息键值以二进制进行存储,因此 Kafka 并不存在 schema 或数据类型。Kafka 消息使用格式配置进行序列化和反序列化,例如 csv,json,avro。 因此,数据类型映射取决于使用的格式。请参阅 <a href="https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/formats/overview/" target="_blank" rel="noopener noreferrer">格式</a> 页面以获取更多细节。</p></div><footer class="theme-doc-footer docusaurus-mt-lg"><div class="theme-doc-footer-edit-meta-row row"><div class="col"><a href="https://github.com/apache/inlong-website/edit/master/versioned_docs/version-1.11.0/data_node/load_node/kafka.md" target="_blank" rel="noreferrer noopener" class="theme-edit-this-page"><svg fill="currentColor" height="20" width="20" viewBox="0 0 40 40" class="iconEdit_Z9Sw" aria-hidden="true"><g><path d="m34.5 11.7l-3 3.1-6.3-6.3 3.1-3q0.5-0.5 1.2-0.5t1.1 0.5l3.9 3.9q0.5 0.4 0.5 1.1t-0.5 1.2z m-29.5 17.1l18.4-18.5 6.3 6.3-18.4 18.4h-6.3v-6.2z"></path></g></svg>Edit this page</a></div><div class="col lastUpdated_vwxv"></div></div></footer></article><nav class="pagination-nav docusaurus-mt-lg" aria-label="Docs pages navigation"><a class="pagination-nav__link pagination-nav__link--prev" href="/zh-CN/docs/data_node/load_node/iceberg"><div class="pagination-nav__sublabel">Previous</div><div class="pagination-nav__label">Iceberg</div></a><a class="pagination-nav__link pagination-nav__link--next" href="/zh-CN/docs/data_node/load_node/mysql"><div class="pagination-nav__sublabel">Next</div><div class="pagination-nav__label">MySQL</div></a></nav></div></div><div class="col col--3"><div class="tableOfContents_bqdL thin-scrollbar theme-doc-toc-desktop"><ul class="table-of-contents table-of-contents__left-border"><li><a href="#概览" class="table-of-contents__link toc-highlight">概览</a></li><li><a href="#支持的版本" class="table-of-contents__link toc-highlight">支持的版本</a></li><li><a href="#依赖" class="table-of-contents__link toc-highlight">依赖</a><ul><li><a href="#maven-依赖" class="table-of-contents__link toc-highlight">Maven 依赖</a></li></ul></li><li><a href="#如何创建-kafka-load-节点" class="table-of-contents__link toc-highlight">如何创建 Kafka Load 节点</a><ul><li><a href="#sql-api-用法" class="table-of-contents__link toc-highlight">SQL API 用法</a></li><li><a href="#inlong-dashboard-用法" class="table-of-contents__link toc-highlight">InLong Dashboard 用法</a></li><li><a href="#inlong-manager-client-用法" class="table-of-contents__link toc-highlight">InLong Manager Client 用法</a></li></ul></li><li><a href="#kafka-load-节点参数" class="table-of-contents__link toc-highlight">Kafka Load 节点参数</a></li><li><a href="#可用的元数据字段" class="table-of-contents__link toc-highlight">可用的元数据字段</a></li><li><a href="#特征" class="table-of-contents__link toc-highlight">特征</a><ul><li><a href="#支持动态-schema-写入" class="table-of-contents__link toc-highlight">支持动态 Schema 写入</a></li></ul></li><li><a href="#数据类型映射" class="table-of-contents__link toc-highlight">数据类型映射</a></li></ul></div></div></div></div></main></div></div><footer class="footer"><div class="container container-fluid"><div class="row footer__links"><div class="col footer__col"><div class="footer__title">事件</div><ul class="footer__items clean-list"><li class="footer__item"><a href="https://www.apachecon.com/" target="_blank" rel="noopener noreferrer" class="footer__link-item">ApacheCon<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li class="footer__item"><a class="acevent" data-format="square" data-mode="dark" data-event="random"></a></li></ul></div><div class="col footer__col"><div class="footer__title">社区</div><ul class="footer__items clean-list"><li class="footer__item"><a href="https://twitter.com/ApacheInlong" target="_blank" rel="noopener noreferrer" class="footer__link-item">Twitter<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li class="footer__item"><a href="https://inlong.apache.org/img/apache-inlong-wechat.jpg" target="_blank" rel="noopener noreferrer" class="footer__link-item">WeChat<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li class="footer__item"><a href="mailto:dev@inlong.apache.org" target="_blank" rel="noopener noreferrer" class="footer__link-item">Email</a></li></ul></div><div class="col footer__col"><div class="footer__title">更多</div><ul class="footer__items clean-list"><li class="footer__item"><a class="footer__link-item" href="/zh-CN/blog">Blog</a></li><li class="footer__item"><a href="https://github.com/apache/inlong" target="_blank" rel="noopener noreferrer" class="footer__link-item">GitHub<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li></ul></div></div><div class="footer__bottom text--center"><div class="margin-bottom--sm"><img src="/zh-CN/img/asf_logo.svg" alt="Apache InLong" class="themedImage_ToTc themedImage--light_HNdA footer__logo"><img src="/zh-CN/img/asf_logo.svg" alt="Apache InLong" class="themedImage_ToTc themedImage--dark_i4oU footer__logo"></div><div class="footer__copyright"><div style="font-family: Avenir-Medium;font-size: 14px;color: #999;">
<div>Copyright © 2020-2024 The Apache Software Foundation. Licensed under the Apache License, Version 2.0.</div>
<div style="margin-top: 20px; padding-top: 20px; border-top: 1px solid #666;line-height: 20px;">The Apache Software Foundation Apache InLong, InLong, Apache, the Apache feather, and the Apache InLong project logo are either registered trademarks or trademarks of the Apache Software Foundation.</div>
</div></div></div></div></footer></div>
<script src="/zh-CN/assets/js/runtime~main.933f32cb.js"></script>
<script src="/zh-CN/assets/js/main.32851df5.js"></script>
</body>
</html>