blob: bf535ac2e30871311e1764a4ef5b08a841524259 [file] [log] [blame]
<!doctype html>
<html lang="zh-CN" dir="ltr" class="docs-wrapper docs-doc-page docs-version-current plugin-docs plugin-id-default docs-doc-id-modules/tubemq/client_rpc">
<head>
<meta charset="UTF-8">
<meta name="generator" content="Docusaurus v2.3.1">
<title data-rh="true">客户端RPC | Apache InLong</title><meta data-rh="true" name="viewport" content="width=device-width,initial-scale=1"><meta data-rh="true" name="twitter:card" content="summary_large_image"><meta data-rh="true" property="og:url" content="https://inlong.apache.org/zh-CN/docs/next/modules/tubemq/client_rpc"><meta data-rh="true" name="docusaurus_locale" content="zh-CN"><meta data-rh="true" name="docsearch:language" content="zh-CN"><meta data-rh="true" name="docusaurus_version" content="current"><meta data-rh="true" name="docusaurus_tag" content="docs-default-current"><meta data-rh="true" name="docsearch:version" content="current"><meta data-rh="true" name="docsearch:docusaurus_tag" content="docs-default-current"><meta data-rh="true" property="og:title" content="客户端RPC | Apache InLong"><meta data-rh="true" name="description" content="1 总体介绍:"><meta data-rh="true" property="og:description" content="1 总体介绍:"><link data-rh="true" rel="icon" href="/zh-CN/img/logo.svg"><link data-rh="true" rel="canonical" href="https://inlong.apache.org/zh-CN/docs/next/modules/tubemq/client_rpc"><link data-rh="true" rel="alternate" href="https://inlong.apache.org/docs/next/modules/tubemq/client_rpc" hreflang="en"><link data-rh="true" rel="alternate" href="https://inlong.apache.org/zh-CN/docs/next/modules/tubemq/client_rpc" hreflang="zh-CN"><link data-rh="true" rel="alternate" href="https://inlong.apache.org/docs/next/modules/tubemq/client_rpc" hreflang="x-default"><link data-rh="true" rel="preconnect" href="https://YUW9QEL53E-dsn.algolia.net" crossorigin="anonymous"><link rel="alternate" type="application/rss+xml" href="/zh-CN/blog/rss.xml" title="Apache InLong RSS Feed">
<link rel="alternate" type="application/atom+xml" href="/zh-CN/blog/atom.xml" title="Apache InLong Atom Feed">
<link rel="search" type="application/opensearchdescription+xml" title="Apache InLong" href="/zh-CN/opensearch.xml">
<script src="https://www.apachecon.com/event-images/snippet.js" async></script><link rel="stylesheet" href="/zh-CN/assets/css/styles.09deabdb.css">
<link rel="preload" href="/zh-CN/assets/js/runtime~main.933f32cb.js" as="script">
<link rel="preload" href="/zh-CN/assets/js/main.32851df5.js" as="script">
</head>
<body class="navigation-with-keyboard">
<script>!function(){function e(e){document.documentElement.setAttribute("data-theme",e)}var t=function(){var e=null;try{e=localStorage.getItem("theme")}catch(e){}return e}();null!==t?e(t):window.matchMedia("(prefers-color-scheme: dark)").matches?e("dark"):(window.matchMedia("(prefers-color-scheme: light)").matches,e("light"))}()</script><div id="__docusaurus">
<div role="region" aria-label="Skip to main content"><a class="skipToContent_fXgn" href="#docusaurus_skipToContent_fallback">Skip to main content</a></div><nav aria-label="主导航" class="navbar navbar--fixed-top"><div class="navbar__inner"><div class="navbar__items"><button aria-label="切换导航栏" aria-expanded="false" class="navbar__toggle clean-btn" type="button"><svg width="30" height="30" viewBox="0 0 30 30" aria-hidden="true"><path stroke="currentColor" stroke-linecap="round" stroke-miterlimit="10" stroke-width="2" d="M4 7h22M4 15h22M4 23h22"></path></svg></button><a class="navbar__brand" href="/zh-CN/"><div class="navbar__logo"><img src="/zh-CN/img/logo.svg" alt="Apache" class="themedImage_ToTc themedImage--light_HNdA"><img src="/zh-CN/img/logo.svg" alt="Apache" class="themedImage_ToTc themedImage--dark_i4oU"></div><b class="navbar__title text--truncate">Apache InLong</b></a></div><div class="navbar__items navbar__items--right"><div class="navbar__item dropdown dropdown--hoverable dropdown--right"><a class="navbar__link" aria-haspopup="true" aria-expanded="false" role="button" href="/zh-CN/docs/introduction">文档</a><ul class="dropdown__menu"><li><a class="dropdown__link" href="/zh-CN/docs/next/introduction">Next</a></li><li><a class="dropdown__link" href="/zh-CN/docs/introduction">1.11.0</a></li><li><a class="dropdown__link" href="/zh-CN/docs/1.10.0/introduction">1.10.0</a></li><li><a class="dropdown__link" href="/zh-CN/docs/1.9.0/introduction">1.9.0</a></li><li><a class="dropdown__link" href="/zh-CN/docs/1.8.0/introduction">1.8.0</a></li><li><a class="dropdown__link" href="/zh-CN/versions/">All versions</a></li></ul></div><a class="navbar__item navbar__link" href="/zh-CN/downloads">下载</a><a class="navbar__item navbar__link" href="/zh-CN/community/how-to-contribute">社区</a><a class="navbar__item navbar__link" href="/zh-CN/blog">博客</a><a class="navbar__item navbar__link" href="/zh-CN/team">团队</a><div class="navbar__item dropdown dropdown--hoverable dropdown--right"><a href="#" aria-haspopup="true" aria-expanded="false" role="button" class="navbar__link">Apache</a><ul class="dropdown__menu"><li><a href="https://www.apache.org/" target="_blank" rel="noopener noreferrer" class="dropdown__link">Apache Software Foundation</a></li><li><a href="https://www.apache.org/licenses/" target="_blank" rel="noopener noreferrer" class="dropdown__link">证书</a></li><li><a href="https://www.apache.org/events/current-event" target="_blank" rel="noopener noreferrer" class="dropdown__link">事件</a></li><li><a href="https://www.apache.org/security/" target="_blank" rel="noopener noreferrer" class="dropdown__link">安全</a></li><li><a href="https://www.apache.org/foundation/sponsorship.html" target="_blank" rel="noopener noreferrer" class="dropdown__link">赞助</a></li><li><a href="https://www.apache.org/foundation/policies/privacy.html" target="_blank" rel="noopener noreferrer" class="dropdown__link">Privacy</a></li><li><a href="https://www.apache.org/foundation/thanks.html" target="_blank" rel="noopener noreferrer" class="dropdown__link">致谢</a></li></ul></div><div class="navbar__item dropdown dropdown--hoverable dropdown--right"><a href="#" aria-haspopup="true" aria-expanded="false" role="button" class="navbar__link"><svg viewBox="0 0 24 24" width="20" height="20" aria-hidden="true" class="iconLanguage_nlXk"><path fill="currentColor" d="M12.87 15.07l-2.54-2.51.03-.03c1.74-1.94 2.98-4.17 3.71-6.53H17V4h-7V2H8v2H1v1.99h11.17C11.5 7.92 10.44 9.75 9 11.35 8.07 10.32 7.3 9.19 6.69 8h-2c.73 1.63 1.73 3.17 2.98 4.56l-5.09 5.02L4 19l5-5 3.11 3.11.76-2.04zM18.5 10h-2L12 22h2l1.12-3h4.75L21 22h2l-4.5-12zm-2.62 7l1.62-4.33L19.12 17h-3.24z"></path></svg>简体中文</a><ul class="dropdown__menu"><li><a href="/docs/next/modules/tubemq/client_rpc" target="_self" rel="noopener noreferrer" class="dropdown__link" lang="en">English</a></li><li><a href="/zh-CN/docs/next/modules/tubemq/client_rpc" target="_self" rel="noopener noreferrer" class="dropdown__link dropdown__link--active" lang="zh-CN">简体中文</a></li></ul></div><a href="https://github.com/apache/inlong" target="_blank" rel="noopener noreferrer" class="navbar__item navbar__link">GitHub<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a><div class="toggle_vylO colorModeToggle_DEke"><button class="clean-btn toggleButton_gllP toggleButtonDisabled_aARS" type="button" disabled="" title="切换浅色/暗黑模式(当前为浅色模式)" aria-label="切换浅色/暗黑模式(当前为浅色模式)" aria-live="polite"><svg viewBox="0 0 24 24" width="24" height="24" class="lightToggleIcon_pyhR"><path fill="currentColor" d="M12,9c1.65,0,3,1.35,3,3s-1.35,3-3,3s-3-1.35-3-3S10.35,9,12,9 M12,7c-2.76,0-5,2.24-5,5s2.24,5,5,5s5-2.24,5-5 S14.76,7,12,7L12,7z M2,13l2,0c0.55,0,1-0.45,1-1s-0.45-1-1-1l-2,0c-0.55,0-1,0.45-1,1S1.45,13,2,13z M20,13l2,0c0.55,0,1-0.45,1-1 s-0.45-1-1-1l-2,0c-0.55,0-1,0.45-1,1S19.45,13,20,13z M11,2v2c0,0.55,0.45,1,1,1s1-0.45,1-1V2c0-0.55-0.45-1-1-1S11,1.45,11,2z M11,20v2c0,0.55,0.45,1,1,1s1-0.45,1-1v-2c0-0.55-0.45-1-1-1C11.45,19,11,19.45,11,20z M5.99,4.58c-0.39-0.39-1.03-0.39-1.41,0 c-0.39,0.39-0.39,1.03,0,1.41l1.06,1.06c0.39,0.39,1.03,0.39,1.41,0s0.39-1.03,0-1.41L5.99,4.58z M18.36,16.95 c-0.39-0.39-1.03-0.39-1.41,0c-0.39,0.39-0.39,1.03,0,1.41l1.06,1.06c0.39,0.39,1.03,0.39,1.41,0c0.39-0.39,0.39-1.03,0-1.41 L18.36,16.95z M19.42,5.99c0.39-0.39,0.39-1.03,0-1.41c-0.39-0.39-1.03-0.39-1.41,0l-1.06,1.06c-0.39,0.39-0.39,1.03,0,1.41 s1.03,0.39,1.41,0L19.42,5.99z M7.05,18.36c0.39-0.39,0.39-1.03,0-1.41c-0.39-0.39-1.03-0.39-1.41,0l-1.06,1.06 c-0.39,0.39-0.39,1.03,0,1.41s1.03,0.39,1.41,0L7.05,18.36z"></path></svg><svg viewBox="0 0 24 24" width="24" height="24" class="darkToggleIcon_wfgR"><path fill="currentColor" d="M9.37,5.51C9.19,6.15,9.1,6.82,9.1,7.5c0,4.08,3.32,7.4,7.4,7.4c0.68,0,1.35-0.09,1.99-0.27C17.45,17.19,14.93,19,12,19 c-3.86,0-7-3.14-7-7C5,9.07,6.81,6.55,9.37,5.51z M12,3c-4.97,0-9,4.03-9,9s4.03,9,9,9s9-4.03,9-9c0-0.46-0.04-0.92-0.1-1.36 c-0.98,1.37-2.58,2.26-4.4,2.26c-2.98,0-5.4-2.42-5.4-5.4c0-1.81,0.89-3.42,2.26-4.4C12.92,3.04,12.46,3,12,3L12,3z"></path></svg></button></div><div class="searchBox_ZlJk"><button type="button" class="DocSearch DocSearch-Button" aria-label="搜索"><span class="DocSearch-Button-Container"><svg width="20" height="20" class="DocSearch-Search-Icon" viewBox="0 0 20 20"><path d="M14.386 14.386l4.0877 4.0877-4.0877-4.0877c-2.9418 2.9419-7.7115 2.9419-10.6533 0-2.9419-2.9418-2.9419-7.7115 0-10.6533 2.9418-2.9419 7.7115-2.9419 10.6533 0 2.9419 2.9418 2.9419 7.7115 0 10.6533z" stroke="currentColor" fill="none" fill-rule="evenodd" stroke-linecap="round" stroke-linejoin="round"></path></svg><span class="DocSearch-Button-Placeholder">搜索</span></span><span class="DocSearch-Button-Keys"></span></button></div></div></div><div role="presentation" class="navbar-sidebar__backdrop"></div></nav><div id="docusaurus_skipToContent_fallback" class="main-wrapper mainWrapper_z2l0 docsWrapper_BCFX"><button aria-label="回到顶部" class="clean-btn theme-back-to-top-button backToTopButton_sjWU" type="button"></button><div class="docPage__5DB"><aside class="theme-doc-sidebar-container docSidebarContainer_b6E3"><div class="sidebarViewport_Xe31"><div class="sidebar_njMd"><nav aria-label="文档侧边栏" class="menu thin-scrollbar menu_SIkG"><ul class="theme-doc-sidebar-menu menu__list"><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-1 menu__list-item"><a class="menu__link" href="/zh-CN/docs/next/introduction">简介</a></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh-CN/docs/next/design_and_concept/basic_concept">设计和概念</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh-CN/docs/next/quick_start/how_to_build">快速开始</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh-CN/docs/next/deployment/standalone">安装部署</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret menu__link--active" aria-expanded="true" href="/zh-CN/docs/next/modules/agent/overview">组件介绍</a></div><ul style="display:block;overflow:visible;height:auto" class="menu__list"><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-2 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" tabindex="0" href="/zh-CN/docs/next/modules/agent/overview">Agent</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-2 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" tabindex="0" href="/zh-CN/docs/next/modules/dataproxy/overview">DataProxy</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-2 menu__list-item"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret menu__link--active" aria-expanded="true" tabindex="0" href="/zh-CN/docs/next/modules/tubemq/overview">TubeMQ</a></div><ul style="display:block;overflow:visible;height:auto" class="menu__list"><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/next/modules/tubemq/overview">总览</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/next/modules/tubemq/quick_start">快速开始</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/next/modules/tubemq/producer_example">生产者示例</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/next/modules/tubemq/consumer_example">消费者示例</a></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-3 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" tabindex="0" href="/zh-CN/docs/next/modules/tubemq/tubemq-manager/overview">tubemq-manager</a></div></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/next/modules/tubemq/commandline_tools">命令行工具</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/next/modules/tubemq/client_partition_assign_introduction">客户端分区分配</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link menu__link--active" aria-current="page" tabindex="0" href="/zh-CN/docs/next/modules/tubemq/client_rpc">客户端RPC</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/next/modules/tubemq/clients_java">JAVA SDK API</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/next/modules/tubemq/configure_introduction">配置参数</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/next/modules/tubemq/console_introduction">TubeMQ管控台操作指引</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/next/modules/tubemq/error_code">错误码定义</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/next/modules/tubemq/http_access_api">HTTP API介绍</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/next/modules/tubemq/tubemq_metrics">TubeMQ指标</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/next/modules/tubemq/tubemq_perf_test_vs_Kafka">TubeMQ VS Kafka</a></li></ul></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-2 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" tabindex="0" href="/zh-CN/docs/next/modules/sort/overview">Sort</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-2 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" tabindex="0" href="/zh-CN/docs/next/modules/manager/overview">Manager</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-2 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" tabindex="0" href="/zh-CN/docs/next/modules/dashboard/overview">Dashboard</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-2 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" tabindex="0" href="/zh-CN/docs/next/modules/sort-standalone/overview">Sort Standalone</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-2 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" tabindex="0" href="/zh-CN/docs/next/modules/audit/overview">Audit</a></div></li></ul></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh-CN/docs/next/data_node/extract_node/overview">数据节点</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh-CN/docs/next/sdk/dataproxy-sdk/cpp">SDK</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh-CN/docs/next/user_guide/dashboard_usage">用户指引</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh-CN/docs/next/development/inlong_msg">开发指引</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh-CN/docs/next/administration/user_management">Administration</a></div></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-1 menu__list-item"><a class="menu__link" href="/zh-CN/docs/next/contact">联系我们</a></li></ul></nav></div></div></aside><main class="docMainContainer_gTbr"><div class="container padding-top--md padding-bottom--lg"><div class="row"><div class="col docItemCol_VOVn"><div class="theme-doc-version-banner alert alert--warning margin-bottom--md" role="alert"><div>This is unreleased documentation for <!-- -->Apache InLong<!-- --> <b>Next</b> version.</div><div class="margin-top--md">For up-to-date documentation, see the <b><a href="/zh-CN/docs/modules/tubemq/client_rpc">latest version</a></b> (<!-- -->1.11.0<!-- -->).</div></div><div class="docItemContainer_Djhp"><article><nav class="theme-doc-breadcrumbs breadcrumbsContainer_Z_bl" aria-label="页面路径"><ul class="breadcrumbs" itemscope="" itemtype="https://schema.org/BreadcrumbList"><li class="breadcrumbs__item"><a aria-label="主页面" class="breadcrumbs__link" href="/zh-CN/"><svg viewBox="0 0 24 24" class="breadcrumbHomeIcon_YNFT"><path d="M10 19v-5h4v5c0 .55.45 1 1 1h3c.55 0 1-.45 1-1v-7h1.7c.46 0 .68-.57.33-.87L12.67 3.6c-.38-.34-.96-.34-1.34 0l-8.36 7.53c-.34.3-.13.87.33.87H5v7c0 .55.45 1 1 1h3c.55 0 1-.45 1-1z" fill="currentColor"></path></svg></a></li><li class="breadcrumbs__item"><span class="breadcrumbs__link">组件介绍</span><meta itemprop="position" content="1"></li><li class="breadcrumbs__item"><span class="breadcrumbs__link">TubeMQ</span><meta itemprop="position" content="2"></li><li itemscope="" itemprop="itemListElement" itemtype="https://schema.org/ListItem" class="breadcrumbs__item breadcrumbs__item--active"><span class="breadcrumbs__link" itemprop="name">客户端RPC</span><meta itemprop="position" content="3"></li></ul></nav><span class="theme-doc-version-badge badge badge--secondary">版本:Next</span><div class="tocCollapsible_ETCw theme-doc-toc-mobile tocMobile_ITEo"><button type="button" class="clean-btn tocCollapsibleButton_TO0P">On this page</button></div><div class="theme-doc-markdown markdown"><header><h1>客户端RPC</h1></header><h2 class="anchor anchorWithStickyNavbar_LWe7" id="1-总体介绍">1 总体介绍:<a href="#1-总体介绍" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h2><p>Apache InLong TubeMQ 模块的各个节点间(Client、Master、Broker)通过 TCP 协议长连接交互,其消息采用的是 【二进制 + Protobuf 编码】组合方式进行定义。</p><p><img loading="lazy" src="/zh-CN/assets/images/rpc_bytes_def-c8c63b21ec5ea017431db345384baa2d.png" width="1049" height="614" class="img_ev3q"></p><p>在 TCP 里我们看到的都是二进制流,包括:</p><ul><li>msgToken: 消息头字段 <code>RPC_PROTOCOL_BEGIN_TOKEN</code>,4 字节,用来区分每一条消息以及识别对端的合法性,如果客户端收到的响应消息不是以该字段开始,说明连接方非本系统支持的协议,或者返回数据出现了异常,这个时候需要关闭该连接,提示错误退出或者重连</li><li>serialNo:消息序列号,4 字节,主要用于客户端关联请求响应的上下文,由请求方生成,通过请求消息携带给服务端,服务器端完成该请求消息服务后通过请求消息的对应响应消息原样返回</li><li>listSize:4 字节,表示接下来按照 PB 编码的数据块个数,目前协议定义下该字段不为0</li><li><code>[&lt;len&gt;&lt;data&gt;]</code>: 是 2 个字段组合,表示这个数据块长度及数据内容<ul><li>len:即数据块长度</li><li>data:数据</li></ul></li></ul><blockquote><p>为什么会以 <code>listSize [&lt;len&gt;&lt;data&gt;]</code> 形式定义 Protobuf(下文简称 PB)数据内容?</p><p>因为在 TubeMQ 的实现中,序列化后的 PB 数据是通过 ByteBuffer 对象保存的,Java里 ByteBuffer 存在一个最大块长 8196,超过单个块长度的 PB 消息内容就需要用多个ByteBuffer 保存,序列化到 TCP 消息时候,没有统计总长,直接按照 PB 序列化的 ByteBuffer 列表写入到了消息中。 </p><p><strong>在多语言实现时候,这块需要特别注意:</strong> 需要将 PB 数据内容序列化成块数组(PB 编解码里有对应支持)。</p></blockquote><p>客户端 RPC 对应的实现在 <code>org.apache.inlong.tubemq.corerpc</code> 模块下。</p><h2 class="anchor anchorWithStickyNavbar_LWe7" id="2-pb格式编码">2 PB格式编码:<a href="#2-pb格式编码" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h2><p>PB 协议分为三个部分:</p><ul><li>RPC 框架定义:<code>RPC.proto</code></li><li>Master 相关的消息编码:<code>MasterService.proto</code></li><li>Broker 相关的消息编码:<code>BrokerService.proto</code></li></ul><p>可以通过 PB 直接编译得到对应的实现类。</p><p>RPC.proto定义了 6 个结构,分为 2 种类型:</p><ul><li>请求消息</li><li>响应消息,包括正常的响应返回以及抛异常情况下的响应返回</li></ul><p>请求消息编码及响应消息解码可以参考 <code>NettyClient.java</code> 类实现,这个部分的定义存在一些改进空间,具体见 <a href="https://issues.apache.org/jira/browse/TUBEMQ-109" target="_blank" rel="noopener noreferrer">TUBEMQ-109</a>,但由于兼容性考虑,会逐步的替换,按照当前 proto 版本实现至少在 1.0.0 版本前交互不是问题,但 1.0.0 时会考虑用新协议,协议实现模块需要各个 SDK 预留出改进空间。</p><p>以请求消息填写为例,RpcConnHeader等相关结构如下:</p><div class="language-protobuf codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-protobuf codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">message RpcConnHeader {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> required int32 flag = 1;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> optional int64 traceId = 2;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> optional int64 spanId = 3;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> optional int64 parentId = 4;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">}</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">message RequestHeader {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> optional int32 serviceType = 1;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> optional int32 protocolVer = 2;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">}</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">message RequestBody {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> required int32 method = 1;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> optional int64 timeout = 2;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> optional bytes request = 3;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">}</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p><code>RpcConnHeader</code><code>flag</code> 标记的是否请求消息,后面 3 个字段标记的是消息跟踪的相关内容,目前没有使用;</p><p><code>RequestHeader</code> 包含了服务类型,协议版本相关信息;</p><p><code>RequestBody</code> 则包含了请求方法,超时时间,请求内容; 其中 <code>timeout</code> 是一个请求被服务器收到到实际处理时的最大允许等待时间长,超过就丢弃,目前缺省为10秒。 请求填写具体见如下部分:</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">RequestWrapper requestWrapper =</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> new RequestWrapper(PbEnDecoder.getServiceIdByServiceName(targetInterface),</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> RpcProtocol.RPC_PROTOCOL_VERSION,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> RpcConstants.RPC_FLAG_MSG_TYPE_REQUEST,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> requestTimeout); // 请求超时时间</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><h2 class="anchor anchorWithStickyNavbar_LWe7" id="3-客户端的pb请求响应交互图">3 客户端的PB请求响应交互图:<a href="#3-客户端的pb请求响应交互图" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h2><h3 class="anchor anchorWithStickyNavbar_LWe7" id="31-producer交互图">3.1 Producer交互图:<a href="#31-producer交互图" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h3><p>Producer 在系统中一共 4 对指令,到 master 是要做注册,心跳,退出操作;到 broker 只有发送消息:
<img loading="lazy" src="/zh-CN/assets/images/rpc_producer_diagram-ed4bb00e58a1ecec65f60f7c921b538c.png" width="974" height="694" class="img_ev3q"></p><p>从这里我们可以看到,Producer 实现逻辑就是从 Master 侧获取指定 Topic 对应的分区列表等元数据信息,获得这些信息后按照客户端的规则选择分区并把消息发送给对应的 Broker。</p><p>Producer 的 <strong>多语言实现的时候需要注意:</strong></p><ul><li><p>Master 是有主备节点的,只有主节点可以提供服务,当 producer 链接到备节点时,会得到 <code>StandbyException</code>, 此时需要链接到其他的 Master 节点,直到链接到主节点为止;</p></li><li><p>生产过程中遇到 Master 连接失败时,比如超时,链接被动断开等,Producer 要进行重注册;</p></li><li><p>Producer 要注意提前做到 Broker 的预连接操作:后端集群的 Broker 节点可达上百台,再叠加每个 Broker 有十个左右的分区,关于分区记录就会存在上千条可能,SDK 从 Master 收到元数据信息后,要提前对暂未建链的 Broker 进行连接建立操作;</p></li><li><p>Producer 到 Broker 的连接要注意异常检测,长期运行场景,要能检测出 Broker 故障节点,并且对于长期不发消息的链接,要将其回收,避免运行不稳定。</p></li></ul><h3 class="anchor anchorWithStickyNavbar_LWe7" id="32-consumer交互图">3.2 Consumer交互图:<a href="#32-consumer交互图" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h3><p>Consumer 一共 7 对指令,到 master 是要做注册,心跳,退出操作;到 broker 包括注册,注销,心跳,拉取消息,确认消息 4 对,其中到 Broker 的注册注销是同一个命令,用了不同的状态码表示:
<img loading="lazy" src="/zh-CN/assets/images/rpc_consumer_diagram-048bd92863ee20cdf21f684cef98258f.png" width="990" height="694" class="img_ev3q"></p><p>从上图我们可以看到,Consumer 首先要注册到 Master,但注册到 Master 时并没有立即获取到元数据信息,原因是 TubeMQ 是采用的是服务器端负载均衡模式,客户端需要等待服务器派发消费分区信息;Consumer到Broker需要进行注册注销操作,原因在于消费时候分区是独占消费,即同一时刻同一分区者只能被同组的一个消费者进行消费,为了解决这个问题,需要客户端进行注册,获得分区的消费权限;消息拉取与消费确认需要成对出现,虽然协议支持多次拉取然后最后一次确认处理,但从客户端可能超时丢失分区的消费权限,从而导致数据回滚重复消费触发,数据积攒的越多重复消费的量就越多,所以按照 1:1 的提交比较合适。</p><h2 class="anchor anchorWithStickyNavbar_LWe7" id="4-客户端功能集合">4 客户端功能集合:<a href="#4-客户端功能集合" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h2><table><thead><tr><th><strong>特性</strong></th><th><strong>Java</strong></th><th><strong>C/C++</strong></th><th><strong>Go</strong></th><th><strong>Python</strong></th><th><strong>Rust</strong></th><th><strong>备注</strong></th></tr></thead><tbody><tr><td>TLS</td><td></td><td></td><td></td><td></td><td></td><td></td></tr><tr><td>认证授权</td><td></td><td></td><td></td><td></td><td></td><td></td></tr><tr><td>防绕 Master 生产消费</td><td></td><td></td><td></td><td></td><td></td><td></td></tr><tr><td>分布式系统里放置客户端不经过 Maste r的认证授权即访问 Broker</td><td></td><td></td><td></td><td></td><td></td><td></td></tr><tr><td>Effectively-Once</td><td></td><td></td><td></td><td></td><td></td><td></td></tr><tr><td>精确指定分区 Offset 消费</td><td></td><td></td><td></td><td></td><td></td><td></td></tr><tr><td>单个组消费多个 Topic 消费</td><td></td><td></td><td></td><td></td><td></td><td></td></tr><tr><td>服务器过滤消费</td><td></td><td></td><td></td><td></td><td></td><td></td></tr><tr><td>生产节点坏点自动屏蔽</td><td></td><td></td><td></td><td></td><td></td><td></td></tr><tr><td>通过算法检测坏点,自动屏蔽故障 Broker 的数据发送</td><td></td><td></td><td></td><td></td><td></td><td></td></tr><tr><td>断链自动重连</td><td></td><td></td><td></td><td></td><td></td><td></td></tr><tr><td>空闲连接自动回收</td><td></td><td></td><td></td><td></td><td></td><td></td></tr><tr><td>超过指定分钟不活跃,主要是生产端,比如 3 分钟</td><td></td><td></td><td></td><td></td><td></td><td></td></tr><tr><td>连接复用</td><td></td><td></td><td></td><td></td><td></td><td></td></tr><tr><td>连接按照 sessionFactory 共用或者不共用</td><td></td><td></td><td></td><td></td><td></td><td></td></tr><tr><td>非连接复用</td><td></td><td></td><td></td><td></td><td></td><td></td></tr><tr><td>异步生产</td><td></td><td></td><td></td><td></td><td></td><td></td></tr><tr><td>同步生产</td><td></td><td></td><td></td><td></td><td></td><td></td></tr><tr><td>Pull 消费</td><td></td><td></td><td></td><td></td><td></td><td></td></tr><tr><td>Push 消费</td><td></td><td></td><td></td><td></td><td></td><td></td></tr><tr><td>消费限流</td><td></td><td></td><td></td><td></td><td></td><td></td></tr><tr><td>控制单位时间消费者消费的数据量</td><td></td><td></td><td></td><td></td><td></td><td></td></tr><tr><td>消费拉取频控</td><td></td><td></td><td></td><td></td><td></td><td></td></tr><tr><td>控制消费者拉取消息的频度</td><td></td><td></td><td></td><td></td><td></td><td></td></tr></tbody></table><h2 class="anchor anchorWithStickyNavbar_LWe7" id="5-客户端功能-casebycase-实现介绍">5 客户端功能 CaseByCase 实现介绍:<a href="#5-客户端功能-casebycase-实现介绍" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h2><h3 class="anchor anchorWithStickyNavbar_LWe7" id="51-客户端与服务器端-rpc-交互过程">5.1 客户端与服务器端 RPC 交互过程:<a href="#51-客户端与服务器端-rpc-交互过程" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h3><hr><p><img loading="lazy" src="/zh-CN/assets/images/rpc_inner_structure-3438098f8c404d1bbf3876abb80db693.png" width="729" height="449" class="img_ev3q"></p><p>如上图示,客户端要维持已发请求消息的本地保存,直到RPC超时,或者收到响应消息,响应消息通过请求发送时生成的SerialNo关联;从服务器端收到的Broker信息,以及Topic信息,SDK要保存在本地,并根据最新的返回信息进行更新,以及定期的上报给服务器端;SDK要维持到Master或者Broker的心跳,如果发现Master反馈注册超时错误时,要进行重注册操作;SDK要基于Broker进行连接建立,同一个进程不同对象之间,要允许业务进行选择,是支持按对象建立连接,还是按照进程建立连接。</p><h3 class="anchor anchorWithStickyNavbar_LWe7" id="52-producer-到-master-注册">5.2 Producer 到 Master 注册:<a href="#52-producer-到-master-注册" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h3><hr><div class="language-protobuf codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-protobuf codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">message RegisterRequestP2M {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> required string clientId = 1;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> repeated string topicList = 2;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> required int64 brokerCheckSum = 3;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> required string hostName = 4;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> optional MasterCertificateInfo authInfo = 5;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> optional string jdkVersion = 6;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> optional ApprovedClientConfig appdConfig = 7;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">}</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">message RegisterResponseM2P {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> required bool success = 1;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> required int32 errCode = 2;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> required string errMsg = 3;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> required int64 brokerCheckSum = 4;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> repeated string brokerInfos = 5;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> optional MasterAuthorizedInfo authorizedInfo = 6;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> optional ApprovedClientConfig appdConfig = 7;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">}</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p><strong>ClientId</strong>:Producer 需要在启动时候构造一个 ClientId,目前的构造规则是:</p><p>Java的SDK 版本里 </p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">ClientId = consumerGroup + &quot;_&quot;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> + AddressUtils.getLocalAddress() + &quot;_&quot; // 本机IP (IPV4)</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> + pid + &quot;_&quot; // 进程ID</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> + timestamp + &quot;_&quot; // 时间戳</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> + counter + &quot;_&quot; // 自增计数器</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> + consumerType + &quot;_&quot; // 消费者类型,包含 Pull 和 Push 两种类型</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> + clientVersion; // 客户端版本号</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>建议其他语言增加如上标记,以便于问题排查。该ID值在Producer生命周期内有效;</p><p><strong>TopicList</strong>:是用户发布的 Topic 列表,Producer 在初始化时候会提供初始的待发布数据的 Topic 列表,在运行中也允许业务通过 publish 函数延迟的增加新的 Topic ,但不支持运行中减少 topic;</p><p><strong>brokerCheckSum</strong>:客户端本地保存的 Broker 元数据信息的校验值,初始启动时候 Producer 本地是没有该数据的,取 -1 值;SDK 需要在每次请求时把上次的 brokerCheckSum 值携带上,Master 通过比较该值来确定客户端的元数据是否需要更新;</p><p><strong>hostname</strong>:Producer 所在机器的 IPV4 地址值;</p><p><strong>success</strong>:操作是否成功,成功为 true,失败为 false;</p><p><strong>errCode</strong>:如果失败,错误码时多少,目前错误码是大类错误码,具体错误原因需要由 errMsg 具体判明;</p><p><strong>errMsg</strong>:具体的错误信息,如果出错,SDK 需要把具体错误信息打出来</p><p><strong>authInfo</strong>:认证授权信息,如果用户配置里填写了启动认证处理,则进行填写;如果是要求认证,则按照用户名及密码的签名进行上报,如果是运行中,比如心跳时,如果 Master 强制认证处理,则按照用户名及密码签名上报,没有的话则根据之前交互时 Master 提供的授权 Token 进行认证;该授权 Token 在生产时候也用于到 Broker 的消息生产时携带。</p><p><strong>brokerInfos</strong>:Broker 元数据信息,该字段里主要是 Master 反馈给 Producer 的整个集群的 Broker 信息列表;其格式如下:</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">public BrokerInfo(String strBrokerInfo, int brokerPort) {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> String[] strBrokers =</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> strBrokerInfo.split(TokenConstants.ATTR_SEP);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> this.brokerId = Integer.parseInt(strBrokers[0]);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> this.host = strBrokers[1];</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> this.port = brokerPort;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> if (!TStringUtils.isBlank(strBrokers[2])) {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> this.port = Integer.parseInt(strBrokers[2]);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> this.buildStrInfo();</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p><strong>authorizedInfo</strong>:Master 提供的授权信息,格式如下:</p><div class="language-protobuf codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-protobuf codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">message MasterAuthorizedInfo {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> required int64 visitAuthorizedToken = 1;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> optional string authAuthorizedToken = 2;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">}</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p><strong>visitAuthorizedToken</strong>:防客户端绕开 Master 的访问授权 Token,如果有该数据,SDK 要保存本地,并且在后续访问 Broker 时携带该信息;如果后续心跳时该字段有变更,则需要更新本地缓存的该字段数据;</p><p><strong>authAuthorizedToken</strong>:认证通过的授权 Token,如果有该字段数据,要保存,并且在后续访问 Master 及 Broker 时携带该字段信息;如果后续心跳时该字段有变更,则需要更新本地缓存的该字段数据;</p><h3 class="anchor anchorWithStickyNavbar_LWe7" id="53-producer-到-master-保持心跳">5.3 Producer 到 Master 保持心跳:<a href="#53-producer-到-master-保持心跳" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h3><hr><div class="language-protobuf codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-protobuf codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">message HeartRequestP2M {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> required string clientId = 1;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> required int64 brokerCheckSum = 2;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> required string hostName = 3;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> repeated string topicList = 4;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> optional MasterCertificateInfo authInfo = 5;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> optional ApprovedClientConfig appdConfig = 6;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">}</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">message HeartResponseM2P {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> required bool success = 1;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> required int32 errCode = 2;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> required string errMsg = 3;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> required int64 brokerCheckSum = 4;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> /* brokerId:host:port-topic:partitionNum */</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> repeated string topicInfos = 5;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> repeated string brokerInfos = 6;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> optional bool requireAuth = 7;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> optional MasterAuthorizedInfo authorizedInfo = 8;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> optional ApprovedClientConfig appdConfig = 9;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">}</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p><strong>topicInfos</strong>:SDK 发布的 Topic 对应的元数据信息,包括分区信息以及所在的 tBroker,具体解码方式如下,由于元数据非常的多,如果将对象数据原样透传所产生的出流量会非常的大,所以我们通过编码方式做了改进:</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">public static Tuple2&lt;Map&lt;String, Integer&gt;, List&lt;TopicInfo&gt;&gt; convertTopicInfo(</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> Map&lt;Integer, BrokerInfo&gt; brokerInfoMap, List&lt;String&gt; strTopicInfos) {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> List&lt;TopicInfo&gt; topicList = new ArrayList&lt;&gt;();</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> Map&lt;String, Integer&gt; topicMaxSizeInBMap = new ConcurrentHashMap&lt;&gt;();</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> if (strTopicInfos == null || strTopicInfos.isEmpty()) {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> return new Tuple2&lt;&gt;(topicMaxSizeInBMap, topicList);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> String[] strInfo;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> String[] strTopicInfoSet;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> String[] strTopicInfo;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> BrokerInfo brokerInfo;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> for (String info : strTopicInfos) {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> if (info == null || info.isEmpty()) {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> continue;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> info = info.trim();</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> strInfo = info.split(TokenConstants.SEGMENT_SEP, -1);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> strTopicInfoSet = strInfo[1].split(TokenConstants.ARRAY_SEP);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> for (String s : strTopicInfoSet) {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> strTopicInfo = s.split(TokenConstants.ATTR_SEP);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> brokerInfo = brokerInfoMap.get(Integer.parseInt(strTopicInfo[0]));</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> if (brokerInfo != null) {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> topicList.add(new TopicInfo(brokerInfo,</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> strInfo[0], Integer.parseInt(strTopicInfo[1]),</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> Integer.parseInt(strTopicInfo[2]), true, true));</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> if (strInfo.length == 2 || TStringUtils.isEmpty(strInfo[2])) {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> continue;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> try {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> topicMaxSizeInBMap.put(strInfo[0], Integer.parseInt(strInfo[2]));</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> } catch (Throwable e) {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> //</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> return new Tuple2&lt;&gt;(topicMaxSizeInBMap, topicList);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p><strong>requireAuth</strong>:标识Master之前的授权访问码(authAuthorizedToken)过期,要求SDK下一次请求,进行用户名及密码的签名信息上报;</p><h3 class="anchor anchorWithStickyNavbar_LWe7" id="54-producer-到-master-关闭退出">5.4 Producer 到 Master 关闭退出:<a href="#54-producer-到-master-关闭退出" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h3><hr><div class="language-protobuf codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-protobuf codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">message CloseRequestP2M{</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> required string clientId = 1;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> optional MasterCertificateInfo authInfo = 2;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">}</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">message CloseResponseM2P{</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> required bool success = 1;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> required int32 errCode = 2;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> required string errMsg = 3;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">}</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>需要注意的是,如果认证开启,关闭会做认证,以避免外部干扰操作。</p><h3 class="anchor anchorWithStickyNavbar_LWe7" id="55-producer-到-broker-发送消息">5.5 Producer 到 Broker 发送消息:<a href="#55-producer-到-broker-发送消息" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h3><hr><p>该部分的内容主要和 Message 的定义有关联,其中</p><div class="language-protobuf codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-protobuf codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">message SendMessageRequestP2B {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> required string clientId = 1;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> required string topicName = 2;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> required int32 partitionId = 3;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> required bytes data = 4;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> required int32 flag = 5;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> required int32 checkSum = 6;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> required int32 sentAddr = 7;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> optional string msgType = 8;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> optional string msgTime = 9;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> optional AuthorizedInfo authInfo = 10;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">}</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">message SendMessageResponseB2P {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> required bool success = 1;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> required int32 errCode = 2;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> required string errMsg = 3;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> optional bool requireAuth = 4;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> optional int64 messageId = 5;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> optional int64 appendTime = 6;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> optional int64 appendOffset = 7;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">}</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p><strong>Data</strong>是 Message 的二进制字节流:</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">private byte[] encodePayload(final Message message) {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> final byte[] payload = message.getData();</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> final String attribute = message.getAttribute();</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> if (TStringUtils.isBlank(attribute)) {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> return payload;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> byte[] attrData = StringUtils.getBytesUtf8(attribute);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> final ByteBuffer buffer =</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> ByteBuffer.allocate(4 + attrData.length + payload.length);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> buffer.putInt(attrData.length);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> buffer.put(attrData);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> buffer.put(payload);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> return buffer.array();</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p><strong>sentAddr</strong>是 SDK 所在的本机 IPv4 地址转成 32 位的数字 ID;</p><p><strong>msgType</strong>是过滤的消息类型,msgTime 是 SDK 发消息时的消息时间,其值来源于构造 Message 时通过 putSystemHeader 填写的值,在 Message 里有对应的 API 获取;</p><p><strong>requireAuth</strong>:到 Broker 进行数据生产的要求认证操作,考虑性能问题,目前未生效,发送消息里填写的 authAuthorizedToken 值以 Master 侧提供的值为准,并且随 Master 侧改变而改变。</p><h3 class="anchor anchorWithStickyNavbar_LWe7" id="56-分区负载均衡过程">5.6 分区负载均衡过程:<a href="#56-分区负载均衡过程" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h3><hr><p>Apache InLong TubeMQ 模块目前采用的是服务器端负载均衡模式,均衡过程由服务器管理维护;后续版本会增加客户端负载均衡模式,形成 2 种模式共存的情况,由业务根据需要选择不同的均衡方式。</p><p><strong>服务器端负载均衡过程如下</strong></p><ul><li>Master进程启动后,会启动负载均衡线程 balancerChore,balancerChore 定时检查当前已注册的消费组,进行负载均衡处理。过程简单来说就是将消费组订阅的分区均匀的分配给已注册的客户端,并定期检测客户端当前分区数是否超过预定的数量,如果超过则将多余的分区拆分给其他数量少的客户端。具体过程:首先 Master 检查当前消费组是否需要做负载均衡,如果需要,则将消费组订阅的 topic 集合的所有分区,以及这个消费组的所有消费者 ID 进行排序,然后按照消费组的所有分区数以及客户端个数进行整除及取模,获得每个客户端至多订阅的分区数;然后给每个客户端分配分区,并在消费者订阅时将分区信息在心跳响应里携带;如果客户端当前已有的分区有多,则给该客户端一条分区释放指令,将该分区从该消费者这里进行分区释放,同时给被分配的消费者一条分区分配的指令,告知分区分配给了对应客户端,具体指令如下:</li></ul><div class="language-protobuf codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-protobuf codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">message EventProto{</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> optional int64 rebalanceId = 1;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> optional int32 opType = 2;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> optional int32 status = 3;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> /* consumerId@group-brokerId:host:port-topic:partitionId */</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> repeated string subscribeInfo = 4;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">}</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p><strong>rebalanceId</strong>:是一个自增 ID 的 long 数值,表示负载均衡的轮次;</p><p><strong>opType</strong>:为操作码,值在 EventType 中定义,目前已实现的操作码只有如下 4 个部分:释放连接,建立连接;only<!-- -->_<!-- -->xxx 目前没有扩展开,收到心跳里携带的负载均衡信息后,Consumer 根据这个值做对应的业务操作;</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">switch (event.getType()) {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> case DISCONNECT:</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> case ONLY_DISCONNECT:</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> disconnectFromBroker(event);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> rebalanceResults.put(event);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> break;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> case CONNECT:</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> case ONLY_CONNECT:</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> connect2Broker(event);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> rebalanceResults.put(event);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> break;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> case REPORT:</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> reportSubscribeInfo();</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> break;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> case STOPREBALANCE:</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> break;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> default:</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> throw new TubeClientException(strBuffer</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> .append(&quot;Invalid rebalance opCode:&quot;)</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> .append(event.getType()).toString());</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">}</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p><strong>status</strong>:表示该事件状态,在 EventStatus 里定义。Master 构造好负载均衡处理任务时设置指令时状态为 TODO;客户端心跳请求过来时,master 将该任务写到响应消息里,设置该指令状态为 PROCESSING;客户端从心跳响应里收到负载均衡指令,进行实际的连接或者断链操作,操作结束后,设置指令状态为 DONE,并等待下一次心跳请求发出时反馈给 Master;</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">public enum EventStatus {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> /**</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> * To be processed state.</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> * */</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> TODO(0, &quot;To be processed&quot;),</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> /**</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> * On processing state.</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> * */</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> PROCESSING(1, &quot;Being processed&quot;),</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> /**</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> * Processed state.</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> * */</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> DONE(2, &quot;Process Done&quot;),</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> /**</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> * Unknown state.</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> * */</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> UNKNOWN(-1, &quot;Unknown event status&quot;),</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> /**</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> * Failed state.</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> * */</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> FAILED(-2, &quot;Process failed&quot;);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">}</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p><strong>subscribeInfo</strong>表示分配的分区信息,格式如注释提示。</p><ul><li>消费端操作:消费端收到 Master 返回的元数据信息后,就进行连接建立和释放操作,见上面 opType 的注解,在连接建立好后,返回事件的处理结果给到 Master,从而完成相关的收到任务,执行任务,以及返回任务处理结果的操作;需要注意的是,负载均衡的注册是尽力而为的操作,如果消费端发起连接操作,但之前占用分区的消费者还没有来得及退出时,会收到 <code>PARTITION_OCCUPIED</code> 的错误响应,这个时候就将该分区从尝试队列删除;而之前分区消费者在收到对应响应后仍会做删除操作,从而下一轮的负载均衡时分配到这个分区的消费者成功注册到分区上。</li></ul><hr><a href="#top">Back to top</a></div><footer class="theme-doc-footer docusaurus-mt-lg"><div class="theme-doc-footer-edit-meta-row row"><div class="col"><a href="https://github.com/apache/inlong-website/edit/master/docs/modules/tubemq/client_rpc.md" target="_blank" rel="noreferrer noopener" class="theme-edit-this-page"><svg fill="currentColor" height="20" width="20" viewBox="0 0 40 40" class="iconEdit_Z9Sw" aria-hidden="true"><g><path d="m34.5 11.7l-3 3.1-6.3-6.3 3.1-3q0.5-0.5 1.2-0.5t1.1 0.5l3.9 3.9q0.5 0.4 0.5 1.1t-0.5 1.2z m-29.5 17.1l18.4-18.5 6.3 6.3-18.4 18.4h-6.3v-6.2z"></path></g></svg>Edit this page</a></div><div class="col lastUpdated_vwxv"></div></div></footer></article><nav class="pagination-nav docusaurus-mt-lg" aria-label="Docs pages navigation"><a class="pagination-nav__link pagination-nav__link--prev" href="/zh-CN/docs/next/modules/tubemq/client_partition_assign_introduction"><div class="pagination-nav__sublabel">Previous</div><div class="pagination-nav__label">客户端分区分配</div></a><a class="pagination-nav__link pagination-nav__link--next" href="/zh-CN/docs/next/modules/tubemq/clients_java"><div class="pagination-nav__sublabel">Next</div><div class="pagination-nav__label">JAVA SDK API</div></a></nav></div></div><div class="col col--3"><div class="tableOfContents_bqdL thin-scrollbar theme-doc-toc-desktop"><ul class="table-of-contents table-of-contents__left-border"><li><a href="#1-总体介绍" class="table-of-contents__link toc-highlight">1 总体介绍:</a></li><li><a href="#2-pb格式编码" class="table-of-contents__link toc-highlight">2 PB格式编码:</a></li><li><a href="#3-客户端的pb请求响应交互图" class="table-of-contents__link toc-highlight">3 客户端的PB请求响应交互图:</a><ul><li><a href="#31-producer交互图" class="table-of-contents__link toc-highlight">3.1 Producer交互图:</a></li><li><a href="#32-consumer交互图" class="table-of-contents__link toc-highlight">3.2 Consumer交互图:</a></li></ul></li><li><a href="#4-客户端功能集合" class="table-of-contents__link toc-highlight">4 客户端功能集合:</a></li><li><a href="#5-客户端功能-casebycase-实现介绍" class="table-of-contents__link toc-highlight">5 客户端功能 CaseByCase 实现介绍:</a><ul><li><a href="#51-客户端与服务器端-rpc-交互过程" class="table-of-contents__link toc-highlight">5.1 客户端与服务器端 RPC 交互过程:</a></li><li><a href="#52-producer-到-master-注册" class="table-of-contents__link toc-highlight">5.2 Producer 到 Master 注册:</a></li><li><a href="#53-producer-到-master-保持心跳" class="table-of-contents__link toc-highlight">5.3 Producer 到 Master 保持心跳:</a></li><li><a href="#54-producer-到-master-关闭退出" class="table-of-contents__link toc-highlight">5.4 Producer 到 Master 关闭退出:</a></li><li><a href="#55-producer-到-broker-发送消息" class="table-of-contents__link toc-highlight">5.5 Producer 到 Broker 发送消息:</a></li><li><a href="#56-分区负载均衡过程" class="table-of-contents__link toc-highlight">5.6 分区负载均衡过程:</a></li></ul></li></ul></div></div></div></div></main></div></div><footer class="footer"><div class="container container-fluid"><div class="row footer__links"><div class="col footer__col"><div class="footer__title">事件</div><ul class="footer__items clean-list"><li class="footer__item"><a href="https://www.apachecon.com/" target="_blank" rel="noopener noreferrer" class="footer__link-item">ApacheCon<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li class="footer__item"><a class="acevent" data-format="square" data-mode="dark" data-event="random"></a></li></ul></div><div class="col footer__col"><div class="footer__title">社区</div><ul class="footer__items clean-list"><li class="footer__item"><a href="https://twitter.com/ApacheInlong" target="_blank" rel="noopener noreferrer" class="footer__link-item">Twitter<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li class="footer__item"><a href="https://inlong.apache.org/img/apache-inlong-wechat.jpg" target="_blank" rel="noopener noreferrer" class="footer__link-item">WeChat<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li class="footer__item"><a href="mailto:dev@inlong.apache.org" target="_blank" rel="noopener noreferrer" class="footer__link-item">Email</a></li></ul></div><div class="col footer__col"><div class="footer__title">更多</div><ul class="footer__items clean-list"><li class="footer__item"><a class="footer__link-item" href="/zh-CN/blog">Blog</a></li><li class="footer__item"><a href="https://github.com/apache/inlong" target="_blank" rel="noopener noreferrer" class="footer__link-item">GitHub<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li></ul></div></div><div class="footer__bottom text--center"><div class="margin-bottom--sm"><img src="/zh-CN/img/asf_logo.svg" alt="Apache InLong" class="themedImage_ToTc themedImage--light_HNdA footer__logo"><img src="/zh-CN/img/asf_logo.svg" alt="Apache InLong" class="themedImage_ToTc themedImage--dark_i4oU footer__logo"></div><div class="footer__copyright"><div style="font-family: Avenir-Medium;font-size: 14px;color: #999;">
<div>Copyright © 2020-2024 The Apache Software Foundation. Licensed under the Apache License, Version 2.0.</div>
<div style="margin-top: 20px; padding-top: 20px; border-top: 1px solid #666;line-height: 20px;">The Apache Software Foundation Apache InLong, InLong, Apache, the Apache feather, and the Apache InLong project logo are either registered trademarks or trademarks of the Apache Software Foundation.</div>
</div></div></div></div></footer></div>
<script src="/zh-CN/assets/js/runtime~main.933f32cb.js"></script>
<script src="/zh-CN/assets/js/main.32851df5.js"></script>
</body>
</html>