blob: 2c604215b6dd2acd7ed95eb1490569e07a4521b4 [file] [log] [blame]
<!doctype html>
<html lang="zh-CN" dir="ltr" class="docs-wrapper docs-doc-page docs-version-current plugin-docs plugin-id-default docs-doc-id-modules/tubemq/clients_java">
<head>
<meta charset="UTF-8">
<meta name="generator" content="Docusaurus v2.3.1">
<title data-rh="true">JAVA SDK API | Apache InLong</title><meta data-rh="true" name="viewport" content="width=device-width,initial-scale=1"><meta data-rh="true" name="twitter:card" content="summary_large_image"><meta data-rh="true" property="og:url" content="https://inlong.apache.org/zh-CN/docs/next/modules/tubemq/clients_java"><meta data-rh="true" name="docusaurus_locale" content="zh-CN"><meta data-rh="true" name="docsearch:language" content="zh-CN"><meta data-rh="true" name="docusaurus_version" content="current"><meta data-rh="true" name="docusaurus_tag" content="docs-default-current"><meta data-rh="true" name="docsearch:version" content="current"><meta data-rh="true" name="docsearch:docusaurus_tag" content="docs-default-current"><meta data-rh="true" property="og:title" content="JAVA SDK API | Apache InLong"><meta data-rh="true" name="description" content="1 基础对象接口介绍:"><meta data-rh="true" property="og:description" content="1 基础对象接口介绍:"><link data-rh="true" rel="icon" href="/zh-CN/img/logo.svg"><link data-rh="true" rel="canonical" href="https://inlong.apache.org/zh-CN/docs/next/modules/tubemq/clients_java"><link data-rh="true" rel="alternate" href="https://inlong.apache.org/docs/next/modules/tubemq/clients_java" hreflang="en"><link data-rh="true" rel="alternate" href="https://inlong.apache.org/zh-CN/docs/next/modules/tubemq/clients_java" hreflang="zh-CN"><link data-rh="true" rel="alternate" href="https://inlong.apache.org/docs/next/modules/tubemq/clients_java" hreflang="x-default"><link data-rh="true" rel="preconnect" href="https://YUW9QEL53E-dsn.algolia.net" crossorigin="anonymous"><link rel="alternate" type="application/rss+xml" href="/zh-CN/blog/rss.xml" title="Apache InLong RSS Feed">
<link rel="alternate" type="application/atom+xml" href="/zh-CN/blog/atom.xml" title="Apache InLong Atom Feed">
<link rel="search" type="application/opensearchdescription+xml" title="Apache InLong" href="/zh-CN/opensearch.xml">
<script src="https://www.apachecon.com/event-images/snippet.js" async></script><link rel="stylesheet" href="/zh-CN/assets/css/styles.09deabdb.css">
<link rel="preload" href="/zh-CN/assets/js/runtime~main.933f32cb.js" as="script">
<link rel="preload" href="/zh-CN/assets/js/main.32851df5.js" as="script">
</head>
<body class="navigation-with-keyboard">
<script>!function(){function e(e){document.documentElement.setAttribute("data-theme",e)}var t=function(){var e=null;try{e=localStorage.getItem("theme")}catch(e){}return e}();null!==t?e(t):window.matchMedia("(prefers-color-scheme: dark)").matches?e("dark"):(window.matchMedia("(prefers-color-scheme: light)").matches,e("light"))}()</script><div id="__docusaurus">
<div role="region" aria-label="Skip to main content"><a class="skipToContent_fXgn" href="#docusaurus_skipToContent_fallback">Skip to main content</a></div><nav aria-label="主导航" class="navbar navbar--fixed-top"><div class="navbar__inner"><div class="navbar__items"><button aria-label="切换导航栏" aria-expanded="false" class="navbar__toggle clean-btn" type="button"><svg width="30" height="30" viewBox="0 0 30 30" aria-hidden="true"><path stroke="currentColor" stroke-linecap="round" stroke-miterlimit="10" stroke-width="2" d="M4 7h22M4 15h22M4 23h22"></path></svg></button><a class="navbar__brand" href="/zh-CN/"><div class="navbar__logo"><img src="/zh-CN/img/logo.svg" alt="Apache" class="themedImage_ToTc themedImage--light_HNdA"><img src="/zh-CN/img/logo.svg" alt="Apache" class="themedImage_ToTc themedImage--dark_i4oU"></div><b class="navbar__title text--truncate">Apache InLong</b></a></div><div class="navbar__items navbar__items--right"><div class="navbar__item dropdown dropdown--hoverable dropdown--right"><a class="navbar__link" aria-haspopup="true" aria-expanded="false" role="button" href="/zh-CN/docs/introduction">文档</a><ul class="dropdown__menu"><li><a class="dropdown__link" href="/zh-CN/docs/next/introduction">Next</a></li><li><a class="dropdown__link" href="/zh-CN/docs/introduction">1.11.0</a></li><li><a class="dropdown__link" href="/zh-CN/docs/1.10.0/introduction">1.10.0</a></li><li><a class="dropdown__link" href="/zh-CN/docs/1.9.0/introduction">1.9.0</a></li><li><a class="dropdown__link" href="/zh-CN/docs/1.8.0/introduction">1.8.0</a></li><li><a class="dropdown__link" href="/zh-CN/versions/">All versions</a></li></ul></div><a class="navbar__item navbar__link" href="/zh-CN/downloads">下载</a><a class="navbar__item navbar__link" href="/zh-CN/community/how-to-contribute">社区</a><a class="navbar__item navbar__link" href="/zh-CN/blog">博客</a><a class="navbar__item navbar__link" href="/zh-CN/team">团队</a><div class="navbar__item dropdown dropdown--hoverable dropdown--right"><a href="#" aria-haspopup="true" aria-expanded="false" role="button" class="navbar__link">Apache</a><ul class="dropdown__menu"><li><a href="https://www.apache.org/" target="_blank" rel="noopener noreferrer" class="dropdown__link">Apache Software Foundation</a></li><li><a href="https://www.apache.org/licenses/" target="_blank" rel="noopener noreferrer" class="dropdown__link">证书</a></li><li><a href="https://www.apache.org/events/current-event" target="_blank" rel="noopener noreferrer" class="dropdown__link">事件</a></li><li><a href="https://www.apache.org/security/" target="_blank" rel="noopener noreferrer" class="dropdown__link">安全</a></li><li><a href="https://www.apache.org/foundation/sponsorship.html" target="_blank" rel="noopener noreferrer" class="dropdown__link">赞助</a></li><li><a href="https://www.apache.org/foundation/policies/privacy.html" target="_blank" rel="noopener noreferrer" class="dropdown__link">Privacy</a></li><li><a href="https://www.apache.org/foundation/thanks.html" target="_blank" rel="noopener noreferrer" class="dropdown__link">致谢</a></li></ul></div><div class="navbar__item dropdown dropdown--hoverable dropdown--right"><a href="#" aria-haspopup="true" aria-expanded="false" role="button" class="navbar__link"><svg viewBox="0 0 24 24" width="20" height="20" aria-hidden="true" class="iconLanguage_nlXk"><path fill="currentColor" d="M12.87 15.07l-2.54-2.51.03-.03c1.74-1.94 2.98-4.17 3.71-6.53H17V4h-7V2H8v2H1v1.99h11.17C11.5 7.92 10.44 9.75 9 11.35 8.07 10.32 7.3 9.19 6.69 8h-2c.73 1.63 1.73 3.17 2.98 4.56l-5.09 5.02L4 19l5-5 3.11 3.11.76-2.04zM18.5 10h-2L12 22h2l1.12-3h4.75L21 22h2l-4.5-12zm-2.62 7l1.62-4.33L19.12 17h-3.24z"></path></svg>简体中文</a><ul class="dropdown__menu"><li><a href="/docs/next/modules/tubemq/clients_java" target="_self" rel="noopener noreferrer" class="dropdown__link" lang="en">English</a></li><li><a href="/zh-CN/docs/next/modules/tubemq/clients_java" target="_self" rel="noopener noreferrer" class="dropdown__link dropdown__link--active" lang="zh-CN">简体中文</a></li></ul></div><a href="https://github.com/apache/inlong" target="_blank" rel="noopener noreferrer" class="navbar__item navbar__link">GitHub<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a><div class="toggle_vylO colorModeToggle_DEke"><button class="clean-btn toggleButton_gllP toggleButtonDisabled_aARS" type="button" disabled="" title="切换浅色/暗黑模式(当前为浅色模式)" aria-label="切换浅色/暗黑模式(当前为浅色模式)" aria-live="polite"><svg viewBox="0 0 24 24" width="24" height="24" class="lightToggleIcon_pyhR"><path fill="currentColor" d="M12,9c1.65,0,3,1.35,3,3s-1.35,3-3,3s-3-1.35-3-3S10.35,9,12,9 M12,7c-2.76,0-5,2.24-5,5s2.24,5,5,5s5-2.24,5-5 S14.76,7,12,7L12,7z M2,13l2,0c0.55,0,1-0.45,1-1s-0.45-1-1-1l-2,0c-0.55,0-1,0.45-1,1S1.45,13,2,13z M20,13l2,0c0.55,0,1-0.45,1-1 s-0.45-1-1-1l-2,0c-0.55,0-1,0.45-1,1S19.45,13,20,13z M11,2v2c0,0.55,0.45,1,1,1s1-0.45,1-1V2c0-0.55-0.45-1-1-1S11,1.45,11,2z M11,20v2c0,0.55,0.45,1,1,1s1-0.45,1-1v-2c0-0.55-0.45-1-1-1C11.45,19,11,19.45,11,20z M5.99,4.58c-0.39-0.39-1.03-0.39-1.41,0 c-0.39,0.39-0.39,1.03,0,1.41l1.06,1.06c0.39,0.39,1.03,0.39,1.41,0s0.39-1.03,0-1.41L5.99,4.58z M18.36,16.95 c-0.39-0.39-1.03-0.39-1.41,0c-0.39,0.39-0.39,1.03,0,1.41l1.06,1.06c0.39,0.39,1.03,0.39,1.41,0c0.39-0.39,0.39-1.03,0-1.41 L18.36,16.95z M19.42,5.99c0.39-0.39,0.39-1.03,0-1.41c-0.39-0.39-1.03-0.39-1.41,0l-1.06,1.06c-0.39,0.39-0.39,1.03,0,1.41 s1.03,0.39,1.41,0L19.42,5.99z M7.05,18.36c0.39-0.39,0.39-1.03,0-1.41c-0.39-0.39-1.03-0.39-1.41,0l-1.06,1.06 c-0.39,0.39-0.39,1.03,0,1.41s1.03,0.39,1.41,0L7.05,18.36z"></path></svg><svg viewBox="0 0 24 24" width="24" height="24" class="darkToggleIcon_wfgR"><path fill="currentColor" d="M9.37,5.51C9.19,6.15,9.1,6.82,9.1,7.5c0,4.08,3.32,7.4,7.4,7.4c0.68,0,1.35-0.09,1.99-0.27C17.45,17.19,14.93,19,12,19 c-3.86,0-7-3.14-7-7C5,9.07,6.81,6.55,9.37,5.51z M12,3c-4.97,0-9,4.03-9,9s4.03,9,9,9s9-4.03,9-9c0-0.46-0.04-0.92-0.1-1.36 c-0.98,1.37-2.58,2.26-4.4,2.26c-2.98,0-5.4-2.42-5.4-5.4c0-1.81,0.89-3.42,2.26-4.4C12.92,3.04,12.46,3,12,3L12,3z"></path></svg></button></div><div class="searchBox_ZlJk"><button type="button" class="DocSearch DocSearch-Button" aria-label="搜索"><span class="DocSearch-Button-Container"><svg width="20" height="20" class="DocSearch-Search-Icon" viewBox="0 0 20 20"><path d="M14.386 14.386l4.0877 4.0877-4.0877-4.0877c-2.9418 2.9419-7.7115 2.9419-10.6533 0-2.9419-2.9418-2.9419-7.7115 0-10.6533 2.9418-2.9419 7.7115-2.9419 10.6533 0 2.9419 2.9418 2.9419 7.7115 0 10.6533z" stroke="currentColor" fill="none" fill-rule="evenodd" stroke-linecap="round" stroke-linejoin="round"></path></svg><span class="DocSearch-Button-Placeholder">搜索</span></span><span class="DocSearch-Button-Keys"></span></button></div></div></div><div role="presentation" class="navbar-sidebar__backdrop"></div></nav><div id="docusaurus_skipToContent_fallback" class="main-wrapper mainWrapper_z2l0 docsWrapper_BCFX"><button aria-label="回到顶部" class="clean-btn theme-back-to-top-button backToTopButton_sjWU" type="button"></button><div class="docPage__5DB"><aside class="theme-doc-sidebar-container docSidebarContainer_b6E3"><div class="sidebarViewport_Xe31"><div class="sidebar_njMd"><nav aria-label="文档侧边栏" class="menu thin-scrollbar menu_SIkG"><ul class="theme-doc-sidebar-menu menu__list"><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-1 menu__list-item"><a class="menu__link" href="/zh-CN/docs/next/introduction">简介</a></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh-CN/docs/next/design_and_concept/basic_concept">设计和概念</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh-CN/docs/next/quick_start/how_to_build">快速开始</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh-CN/docs/next/deployment/standalone">安装部署</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret menu__link--active" aria-expanded="true" href="/zh-CN/docs/next/modules/agent/overview">组件介绍</a></div><ul style="display:block;overflow:visible;height:auto" class="menu__list"><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-2 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" tabindex="0" href="/zh-CN/docs/next/modules/agent/overview">Agent</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-2 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" tabindex="0" href="/zh-CN/docs/next/modules/dataproxy/overview">DataProxy</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-2 menu__list-item"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret menu__link--active" aria-expanded="true" tabindex="0" href="/zh-CN/docs/next/modules/tubemq/overview">TubeMQ</a></div><ul style="display:block;overflow:visible;height:auto" class="menu__list"><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/next/modules/tubemq/overview">总览</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/next/modules/tubemq/quick_start">快速开始</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/next/modules/tubemq/producer_example">生产者示例</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/next/modules/tubemq/consumer_example">消费者示例</a></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-3 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" tabindex="0" href="/zh-CN/docs/next/modules/tubemq/tubemq-manager/overview">tubemq-manager</a></div></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/next/modules/tubemq/commandline_tools">命令行工具</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/next/modules/tubemq/client_partition_assign_introduction">客户端分区分配</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/next/modules/tubemq/client_rpc">客户端RPC</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link menu__link--active" aria-current="page" tabindex="0" href="/zh-CN/docs/next/modules/tubemq/clients_java">JAVA SDK API</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/next/modules/tubemq/configure_introduction">配置参数</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/next/modules/tubemq/console_introduction">TubeMQ管控台操作指引</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/next/modules/tubemq/error_code">错误码定义</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/next/modules/tubemq/http_access_api">HTTP API介绍</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/next/modules/tubemq/tubemq_metrics">TubeMQ指标</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-3 menu__list-item"><a class="menu__link" tabindex="0" href="/zh-CN/docs/next/modules/tubemq/tubemq_perf_test_vs_Kafka">TubeMQ VS Kafka</a></li></ul></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-2 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" tabindex="0" href="/zh-CN/docs/next/modules/sort/overview">Sort</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-2 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" tabindex="0" href="/zh-CN/docs/next/modules/manager/overview">Manager</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-2 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" tabindex="0" href="/zh-CN/docs/next/modules/dashboard/overview">Dashboard</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-2 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" tabindex="0" href="/zh-CN/docs/next/modules/sort-standalone/overview">Sort Standalone</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-2 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" tabindex="0" href="/zh-CN/docs/next/modules/audit/overview">Audit</a></div></li></ul></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh-CN/docs/next/data_node/extract_node/overview">数据节点</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh-CN/docs/next/sdk/dataproxy-sdk/cpp">SDK</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh-CN/docs/next/user_guide/dashboard_usage">用户指引</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh-CN/docs/next/development/inlong_msg">开发指引</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh-CN/docs/next/administration/user_management">Administration</a></div></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-1 menu__list-item"><a class="menu__link" href="/zh-CN/docs/next/contact">联系我们</a></li></ul></nav></div></div></aside><main class="docMainContainer_gTbr"><div class="container padding-top--md padding-bottom--lg"><div class="row"><div class="col docItemCol_VOVn"><div class="theme-doc-version-banner alert alert--warning margin-bottom--md" role="alert"><div>This is unreleased documentation for <!-- -->Apache InLong<!-- --> <b>Next</b> version.</div><div class="margin-top--md">For up-to-date documentation, see the <b><a href="/zh-CN/docs/modules/tubemq/clients_java">latest version</a></b> (<!-- -->1.11.0<!-- -->).</div></div><div class="docItemContainer_Djhp"><article><nav class="theme-doc-breadcrumbs breadcrumbsContainer_Z_bl" aria-label="页面路径"><ul class="breadcrumbs" itemscope="" itemtype="https://schema.org/BreadcrumbList"><li class="breadcrumbs__item"><a aria-label="主页面" class="breadcrumbs__link" href="/zh-CN/"><svg viewBox="0 0 24 24" class="breadcrumbHomeIcon_YNFT"><path d="M10 19v-5h4v5c0 .55.45 1 1 1h3c.55 0 1-.45 1-1v-7h1.7c.46 0 .68-.57.33-.87L12.67 3.6c-.38-.34-.96-.34-1.34 0l-8.36 7.53c-.34.3-.13.87.33.87H5v7c0 .55.45 1 1 1h3c.55 0 1-.45 1-1z" fill="currentColor"></path></svg></a></li><li class="breadcrumbs__item"><span class="breadcrumbs__link">组件介绍</span><meta itemprop="position" content="1"></li><li class="breadcrumbs__item"><span class="breadcrumbs__link">TubeMQ</span><meta itemprop="position" content="2"></li><li itemscope="" itemprop="itemListElement" itemtype="https://schema.org/ListItem" class="breadcrumbs__item breadcrumbs__item--active"><span class="breadcrumbs__link" itemprop="name">JAVA SDK API</span><meta itemprop="position" content="3"></li></ul></nav><span class="theme-doc-version-badge badge badge--secondary">版本:Next</span><div class="tocCollapsible_ETCw theme-doc-toc-mobile tocMobile_ITEo"><button type="button" class="clean-btn tocCollapsibleButton_TO0P">On this page</button></div><div class="theme-doc-markdown markdown"><header><h1>JAVA SDK API</h1></header><h2 class="anchor anchorWithStickyNavbar_LWe7" id="1-基础对象接口介绍">1 基础对象接口介绍:<a href="#1-基础对象接口介绍" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h2><h3 class="anchor anchorWithStickyNavbar_LWe7" id="11-messagesessionfactory消息会话工厂">1.1 MessageSessionFactory(消息会话工厂):<a href="#11-messagesessionfactory消息会话工厂" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h3><p>TubeMQ 采用MessageSessionFactory(消息会话工厂)来管理网络连接,又根据业务不同客户端是否复用连接细分为TubeSingleSessionFactory(单连接会话工厂)类和TubeMultiSessionFactory(多连接会话工厂)类2个部分,其实现逻辑大家可以从代码可以看到,单连接会话通过定义clientFactory静态类,实现了进程内不同客户端连接相同目标服务器时底层物理连接只建立一条的特征,多连接会话里定义的clientFactory为非静态类,从而实现同进程内通过不同会话工厂,创建的客户端所属的连接会话不同建立不同的物理连接。通过这种构造解决连接创建过多的问题,业务可以根据自身需要可以选择不同的消息会话工厂类,一般情况下我们使用单连接会话工厂类。</p><h3 class="anchor anchorWithStickyNavbar_LWe7" id="12-masterinfo">1.2 MasterInfo:<a href="#12-masterinfo" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h3><p>TubeMQ的Master地址信息对象,该对象的特点是支持配置多个Master地址,由于TubeMQ Master借助BDB的存储能力进行元数据管理,以及服务HA热切能力,Master的地址相应地就需要配置多条信息。该配置信息支持IP、域名两种模式,由于TubeMQ的HA是热切模式,客户端要保证到各个Master地址都是连通的。该信息在初始化TubeClientConfig类对象和ConsumerConfig类对象时使用,考虑到配置的方便性,我们将多条Master地址构造成“ip1:port1,ip2:port2,ip3:port3”格式并进行解析。</p><h3 class="anchor anchorWithStickyNavbar_LWe7" id="13-tubeclientconfig">1.3 TubeClientConfig:<a href="#13-tubeclientconfig" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h3><p>MessageSessionFactory(消息会话工厂)初始化类,用来携带创建网络连接信息、客户端控制参数信息的对象类,包括RPC时长设置、Socket属性设置、连接质量检测参数设置、TLS参数设置、认证授权信息设置等信息。</p><h3 class="anchor anchorWithStickyNavbar_LWe7" id="14-consumerconfig">1.4 ConsumerConfig:<a href="#14-consumerconfig" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h3><p>ConsumerConfig类是TubeClientConfig类的子类,它是在TubeClientConfig类基础上增加了Consumer类对象初始化时候的参数携带,因而在一个既有Producer又有Consumer的MessageSessionFactory(消息会话工厂)类对象里,会话工厂类的相关设置以MessageSessionFactory类初始化的内容为准,Consumer类对象按照创建时传递的初始化类对象为准。在consumer里又根据消费行为的不同分为Pull消费者和Push消费者两种,两种特有的参数通过参数接口携带“pull”或“push”不同特征进行区分。</p><h3 class="anchor anchorWithStickyNavbar_LWe7" id="15-message">1.5 Message:<a href="#15-message" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h3><p>Message类是TubeMQ里传递的消息对象类,业务设置的data会从生产端原样传递给消息接收端,attribute内容是与TubeMQ系统共用的字段,业务填写的内容不会丢失和改写,但该字段有可能会新增TubeMQ系统填写的内容,并在后续的版本中,新增的TubeMQ系统内容有可能去掉而不被通知。该部分需要注意的是Message.putSystemHeader(final String msgType, final String msgTime)接口,该接口用来设置消息的消息类型和消息发送时间,msgType用于消费端过滤用,msgTime用做TubeMQ进行数据收发统计时消息时间统计维度用。</p><h3 class="anchor anchorWithStickyNavbar_LWe7" id="16-messageproducer">1.6 MessageProducer:<a href="#16-messageproducer" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h3><p>消息生产者类,该类完成消息的生产,消息发送分为同步发送和异步发送两种接口,目前消息采用Round Robin方式发往后端服务器,后续这块将考虑按照业务指定的算法进行后端服务器选择方式进行生产。该类使用时需要注意的是,我们支持在初始化时候全量Topic指定的publish,也支持在生产过程中临时增加对新的Topic的publish,但临时增加的Topic不会立即生效,因而在使用新增Topic前,要先调用isTopicCurAcceptPublish接口查询该Topic是否已publish并且被服务器接受,否则有可能消息发送失败。</p><h3 class="anchor anchorWithStickyNavbar_LWe7" id="17-messageconsumer">1.7 MessageConsumer:<a href="#17-messageconsumer" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h3><p>该类有两个子类PullMessageConsumer、PushMessageConsumer,通过这两个子类的包装,完成了对业务侧的Pull和Push语义。实际上TubeMQ是采用Pull模式与后端服务进行交互,为了便于业务的接口使用,我们进行了封装,大家可以看到其差别在于Push在启动时初始化了一个线程组,来完成主动的数据拉取操作。需要注意的地方在于:</p><ul><li>a. CompleteSubscribe接口,带参数的接口支持客户端对指定的分区进行指定offset消费,不带参数的接口则按照ConsumerConfig.setConsumeModel(int consumeModel)接口进行对应的消费模式设置来消费数据;</li><li>b. 对subscribe接口,其用来定义该消费者的消费目标,而filterConds参数表示对待消费的Topic是否进行过滤消费,以及如果做过滤消费时要过滤的msgType消息类型值。如果不需要进行过滤消费,则该参数填为null,或者空的集合值。</li></ul><hr><h2 class="anchor anchorWithStickyNavbar_LWe7" id="2-接口调用示例">2 接口调用示例:<a href="#2-接口调用示例" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h2><h3 class="anchor anchorWithStickyNavbar_LWe7" id="21-环境准备">2.1 环境准备:<a href="#21-环境准备" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h3><p>TubeMQ开源包org.apache.inlong.tubemq.example里提供了生产和消费的具体代码示例,这里我们通过一个实际的例子来介绍如何填参和调用对应接口。首先我们搭建一个带3个Master节点的TubeMQ集群,3个Master地址及端口分别为test_1.domain.com,test_2.domain.com,test_3.domain.com,端口均为8080,在该集群里我们建立了若干个Broker,并且针对Broker我们创建了3个topic:topic_1,topic_2,topic_3等Topic配置;然后我们启动对应的Broker等待Consumer和Producer的创建。</p><h3 class="anchor anchorWithStickyNavbar_LWe7" id="22-创建consumer">2.2 创建Consumer:<a href="#22-创建consumer" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h3><p>见包org.apache.inlong.tubemq.example.MessageConsumerExample类文件,Consumer是一个包含网络交互协调的客户端对象,需要做初始化并且长期驻留内存重复使用的模型,它不适合单次拉起消费的场景。如下图示,我们定义了MessageConsumerExample封装类,在该类中定义了进行网络交互的会话工厂MessageSessionFactory类,以及用来做Push消费的PushMessageConsumer类:</p><h4 class="anchor anchorWithStickyNavbar_LWe7" id="221-初始化messageconsumerexample类">2.2.1 初始化MessageConsumerExample类:<a href="#221-初始化messageconsumerexample类" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h4><ol><li><p>首先构造一个ConsumerConfig类,填写初始化信息,包括本机IP V4地址,Master集群地址,消费组组名信息,这里Master地址信息传入值为:”test_1.domain.com:8080,test_2.domain.com:8080,test_3.domain.com:8080”;</p></li><li><p>然后设置消费模式:我们设置首次从队列尾消费,后续接续消费模式;</p></li><li><p>然后设置Push消费时回调函数个数</p></li><li><p>进行会话工厂初始化操作:该场景里我们选择建立单链接的会话工厂;</p></li><li><p>在会话工厂创建模式的消费者:</p></li></ol><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">public final class MessageConsumerExample {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> private static final Logger logger = LoggerFactory.getLogger(MessageConsumerExample.class);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> private static final MsgRecvStats msgRecvStats = new MsgRecvStats();</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> private final String masterHostAndPort;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> private final String localHost;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> private final String group;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> private PushMessageConsumer messageConsumer;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> private MessageSessionFactory messageSessionFactory;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> </span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> public MessageConsumerExample(String localHost, String masterHostAndPort, String group, int fetchCount)</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> throws Exception {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> this.localHost = localHost;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> this.masterHostAndPort = masterHostAndPort;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> this.group = group;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> ConsumerConfig consumerConfig = new ConsumerConfig(this.localHost,this.masterHostAndPort, this.group);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> consumerConfig.setConsumeModel(0);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> if (fetchCount &gt; 0) {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> consumerConfig.setPushFetchThreadCnt(fetchCount);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> this.messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> this.messageConsumer = messageSessionFactory.createPushConsumer(consumerConfig);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">}</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><h4 class="anchor anchorWithStickyNavbar_LWe7" id="222-订阅topic">2.2.2 订阅Topic:<a href="#222-订阅topic" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h4><p>我们没有采用指定Offset消费的模式进行订阅,也没有过滤需求,因而我们在如下代码里只做了Topic的指定,对应的过滤项集合我们传的是null值,同时,对于不同的Topic,我们可以传递不同的消息回调处理函数;我们这里订阅了3个topic,topic_1,topic_2,topic_3,每个topic分别调用subscribe函数进行对应参数设置:</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">public void subscribe(final Map&lt;String, TreeSet&lt;String&gt;&gt; topicStreamIdMap) throws TubeClientException {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> for (Map.Entry&lt;String, TreeSet&lt;String&gt;&gt; entry : topicStreamIdMap.entrySet()) {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> this.messageConsumer.subscribe(entry.getKey(), entry.getValue(),</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> new DefaultMessageListener(entry.getKey()));</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> messageConsumer.completeSubscribe();</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">}</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><h4 class="anchor anchorWithStickyNavbar_LWe7" id="223-进行消费">2.2.3 进行消费:<a href="#223-进行消费" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h4><p>到此,对集群里对应topic的订阅就已完成,系统运行开始后,回调函数里数据将不断的通过回调函数推送到业务层进行处理:</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">public class DefaultMessageListener implements MessageListener {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> private String topic;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> public DefaultMessageListener(String topic) {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> this.topic = topic;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> public void receiveMessages(PeerInfo peerInfo, final List&lt;Message&gt; messages) throws InterruptedException {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> if (messages != null &amp;&amp; !messages.isEmpty()) {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> msgRecvStats.addMsgCount(this.topic, messages.size());</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> public Executor getExecutor() {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> return null;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> public void stop() {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">}</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><h2 class="anchor anchorWithStickyNavbar_LWe7" id="3-创建producer">3 创建Producer:<a href="#3-创建producer" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h2><p>现网环境中业务的数据都是通过代理层来做接收汇聚,包装了比较多的异常处理,大部分的业务都没有也不会接触到TubeSDK的Producer类,考虑到业务自己搭建集群使用TubeMQ进行使用的场景,这里提供对应的使用demo,见包org.apache.inlong.tubemq.example.MessageProducerExample类文件供参考,<strong>需要注意</strong>的是,业务除非使用数据平台的TubeMQ集群做MQ服务,否则仍要按照现网的接入流程使用代理层来进行数据生产:</p><h3 class="anchor anchorWithStickyNavbar_LWe7" id="31-初始化messageproducerexample类">3.1 初始化MessageProducerExample类:<a href="#31-初始化messageproducerexample类" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h3><p>和Consumer的初始化类似,也是构造了一个封装类,定义了一个会话工厂,以及一个Producer类,生产端的会话工厂初始化通过TubeClientConfig类进行,如之前所介绍的,ConsumerConfig类是TubeClientConfig类的子类,虽然传入参数不同,但会话工厂是通过TubeClientConfig类完成的初始化处理:</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">public final class MessageProducerExample {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> private static final Logger logger = LoggerFactory.getLogger(MessageProducerExample.class);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> private static final ConcurrentHashMap&lt;String, AtomicLong&gt; counterMap =</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> new ConcurrentHashMap&lt;String, AtomicLong&gt;();</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> String[] arrayKey = {&quot;aaa&quot;, &quot;bbb&quot;, &quot;ac&quot;, &quot;dd&quot;, &quot;eee&quot;, &quot;fff&quot;, &quot;gggg&quot;, &quot;hhhh&quot;};</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> private MessageProducer messageProducer;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> private TreeSet&lt;String&gt; filters = new TreeSet&lt;&gt;();</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> private int keyCount = 0;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> private int sentCount = 0;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> private MessageSessionFactory messageSessionFactory;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> public MessageProducerExample(final String localHost, final String masterHostAndPort) throws Exception {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> filters.add(&quot;aaa&quot;);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> filters.add(&quot;bbb&quot;);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> TubeClientConfig clientConfig = new TubeClientConfig(localHost, masterHostAndPort);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> this.messageSessionFactory = new TubeSingleSessionFactory(clientConfig);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> this.messageProducer = this.messageSessionFactory.createProducer();</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">}</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><h3 class="anchor anchorWithStickyNavbar_LWe7" id="32-发布topic">3.2 发布Topic:<a href="#32-发布topic" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h3><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">public void publishTopics(List&lt;String&gt; topicList) throws TubeClientException {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> this.messageProducer.publish(new TreeSet&lt;String&gt;(topicList));</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">}</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><h3 class="anchor anchorWithStickyNavbar_LWe7" id="33-进行数据生产">3.3 进行数据生产:<a href="#33-进行数据生产" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h3><p>如下所示,则为具体的数据构造和发送逻辑,构造一个Message对象后调用sendMessage()函数发送即可,有同步接口和异步接口选择,依照业务要求选择不同接口;需要注意的是该业务根据不同消息调用message.putSystemHeader()函数设置消息的过滤属性和发送时间,便于系统进行消息过滤消费,以及指标统计用。完成这些,一条消息即被发送出去,如果返回结果为成功,则消息被成功的接纳并且进行消息处理,如果返回失败,则业务根据具体错误码及错误提示进行判断处理,相关错误详情见《TubeMQ错误信息介绍.xlsx》:</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#F8F8F2;--prism-background-color:#282A36"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#F8F8F2"><span class="token plain">public void sendMessageAsync(int id, long currtime, String topic, byte[] body, MessageSentCallback callback) {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> Message message = new Message(topic, body);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> SimpleDateFormat sdf = new SimpleDateFormat(&quot;yyyyMMddHHmm&quot;);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> long currTimeMillis = System.currentTimeMillis();</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> message.setAttrKeyVal(&quot;index&quot;, String.valueOf(1));</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> String keyCode = arrayKey[sentCount++ % arrayKey.length];</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> message.putSystemHeader(keyCode, sdf.format(new Date(currTimeMillis))); </span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> if (filters.contains(keyCode)) {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> keyCount++;</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> try {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> message.setAttrKeyVal(&quot;dataTime&quot;, String.valueOf(currTimeMillis));</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> messageProducer.sendMessage(message, callback);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> } catch (TubeClientException e) {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> logger.error(&quot;Send message failed!&quot;, e);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> } catch (InterruptedException e) {</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> logger.error(&quot;Send message failed!&quot;, e);</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#F8F8F2"><span class="token plain">}</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg class="copyButtonIcon_y97N" viewBox="0 0 24 24"><path d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg class="copyButtonSuccessIcon_LjdS" viewBox="0 0 24 24"><path d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><h3 class="anchor anchorWithStickyNavbar_LWe7" id="34-producer不同类mamessageproducerexample关注点">3.4 Producer不同类MAMessageProducerExample关注点:<a href="#34-producer不同类mamessageproducerexample关注点" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h3><p>该类初始化与MessageProducerExample类不同,采用的是TubeMultiSessionFactory多会话工厂类进行的连接初始化,该demo提供了如何使用多会话工厂类的特性,可以用于通过多个物理连接提升系统吞吐量的场景(TubeMQ通过连接复用模式来减少物理连接资源的使用),恰当使用可以提升系统的生产性能。在Consumer侧也可以通过多会话工厂进行初始化,但考虑到消费是长时间过程处理,对连接资源的占用比较小,消费场景不推荐使用。</p><p>至此,整个生产和消费的示例已经介绍完,你可以下载代码并编译运行,看看是不是这么简单😊</p><hr><a href="#top">Back to top</a></div><footer class="theme-doc-footer docusaurus-mt-lg"><div class="theme-doc-footer-edit-meta-row row"><div class="col"><a href="https://github.com/apache/inlong-website/edit/master/docs/modules/tubemq/clients_java.md" target="_blank" rel="noreferrer noopener" class="theme-edit-this-page"><svg fill="currentColor" height="20" width="20" viewBox="0 0 40 40" class="iconEdit_Z9Sw" aria-hidden="true"><g><path d="m34.5 11.7l-3 3.1-6.3-6.3 3.1-3q0.5-0.5 1.2-0.5t1.1 0.5l3.9 3.9q0.5 0.4 0.5 1.1t-0.5 1.2z m-29.5 17.1l18.4-18.5 6.3 6.3-18.4 18.4h-6.3v-6.2z"></path></g></svg>Edit this page</a></div><div class="col lastUpdated_vwxv"></div></div></footer></article><nav class="pagination-nav docusaurus-mt-lg" aria-label="Docs pages navigation"><a class="pagination-nav__link pagination-nav__link--prev" href="/zh-CN/docs/next/modules/tubemq/client_rpc"><div class="pagination-nav__sublabel">Previous</div><div class="pagination-nav__label">客户端RPC</div></a><a class="pagination-nav__link pagination-nav__link--next" href="/zh-CN/docs/next/modules/tubemq/configure_introduction"><div class="pagination-nav__sublabel">Next</div><div class="pagination-nav__label">配置参数</div></a></nav></div></div><div class="col col--3"><div class="tableOfContents_bqdL thin-scrollbar theme-doc-toc-desktop"><ul class="table-of-contents table-of-contents__left-border"><li><a href="#1-基础对象接口介绍" class="table-of-contents__link toc-highlight">1 基础对象接口介绍:</a><ul><li><a href="#11-messagesessionfactory消息会话工厂" class="table-of-contents__link toc-highlight">1.1 MessageSessionFactory(消息会话工厂):</a></li><li><a href="#12-masterinfo" class="table-of-contents__link toc-highlight">1.2 MasterInfo:</a></li><li><a href="#13-tubeclientconfig" class="table-of-contents__link toc-highlight">1.3 TubeClientConfig:</a></li><li><a href="#14-consumerconfig" class="table-of-contents__link toc-highlight">1.4 ConsumerConfig:</a></li><li><a href="#15-message" class="table-of-contents__link toc-highlight">1.5 Message:</a></li><li><a href="#16-messageproducer" class="table-of-contents__link toc-highlight">1.6 MessageProducer:</a></li><li><a href="#17-messageconsumer" class="table-of-contents__link toc-highlight">1.7 MessageConsumer:</a></li></ul></li><li><a href="#2-接口调用示例" class="table-of-contents__link toc-highlight">2 接口调用示例:</a><ul><li><a href="#21-环境准备" class="table-of-contents__link toc-highlight">2.1 环境准备:</a></li><li><a href="#22-创建consumer" class="table-of-contents__link toc-highlight">2.2 创建Consumer:</a></li></ul></li><li><a href="#3-创建producer" class="table-of-contents__link toc-highlight">3 创建Producer:</a><ul><li><a href="#31-初始化messageproducerexample类" class="table-of-contents__link toc-highlight">3.1 初始化MessageProducerExample类:</a></li><li><a href="#32-发布topic" class="table-of-contents__link toc-highlight">3.2 发布Topic:</a></li><li><a href="#33-进行数据生产" class="table-of-contents__link toc-highlight">3.3 进行数据生产:</a></li><li><a href="#34-producer不同类mamessageproducerexample关注点" class="table-of-contents__link toc-highlight">3.4 Producer不同类MAMessageProducerExample关注点:</a></li></ul></li></ul></div></div></div></div></main></div></div><footer class="footer"><div class="container container-fluid"><div class="row footer__links"><div class="col footer__col"><div class="footer__title">事件</div><ul class="footer__items clean-list"><li class="footer__item"><a href="https://www.apachecon.com/" target="_blank" rel="noopener noreferrer" class="footer__link-item">ApacheCon<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li class="footer__item"><a class="acevent" data-format="square" data-mode="dark" data-event="random"></a></li></ul></div><div class="col footer__col"><div class="footer__title">社区</div><ul class="footer__items clean-list"><li class="footer__item"><a href="https://twitter.com/ApacheInlong" target="_blank" rel="noopener noreferrer" class="footer__link-item">Twitter<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li class="footer__item"><a href="https://inlong.apache.org/img/apache-inlong-wechat.jpg" target="_blank" rel="noopener noreferrer" class="footer__link-item">WeChat<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li class="footer__item"><a href="mailto:dev@inlong.apache.org" target="_blank" rel="noopener noreferrer" class="footer__link-item">Email</a></li></ul></div><div class="col footer__col"><div class="footer__title">更多</div><ul class="footer__items clean-list"><li class="footer__item"><a class="footer__link-item" href="/zh-CN/blog">Blog</a></li><li class="footer__item"><a href="https://github.com/apache/inlong" target="_blank" rel="noopener noreferrer" class="footer__link-item">GitHub<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li></ul></div></div><div class="footer__bottom text--center"><div class="margin-bottom--sm"><img src="/zh-CN/img/asf_logo.svg" alt="Apache InLong" class="themedImage_ToTc themedImage--light_HNdA footer__logo"><img src="/zh-CN/img/asf_logo.svg" alt="Apache InLong" class="themedImage_ToTc themedImage--dark_i4oU footer__logo"></div><div class="footer__copyright"><div style="font-family: Avenir-Medium;font-size: 14px;color: #999;">
<div>Copyright © 2020-2024 The Apache Software Foundation. Licensed under the Apache License, Version 2.0.</div>
<div style="margin-top: 20px; padding-top: 20px; border-top: 1px solid #666;line-height: 20px;">The Apache Software Foundation Apache InLong, InLong, Apache, the Apache feather, and the Apache InLong project logo are either registered trademarks or trademarks of the Apache Software Foundation.</div>
</div></div></div></div></footer></div>
<script src="/zh-CN/assets/js/runtime~main.933f32cb.js"></script>
<script src="/zh-CN/assets/js/main.32851df5.js"></script>
</body>
</html>