| <div role="region" aria-label="跳到主要内容"><a class="skipToContent_fXgn" href="#__docusaurus_skipToContent_fallback">跳到主要内容</a></div><nav aria-label="主导航" class="navbar navbar--fixed-top"><div class="navbar__inner"><div class="navbar__items"><button aria-label="切换导航栏" aria-expanded="false" class="navbar__toggle clean-btn" type="button"><svg width="30" height="30" viewBox="0 0 30 30" aria-hidden="true"><path stroke="currentColor" stroke-linecap="round" stroke-miterlimit="10" stroke-width="2" d="M4 7h22M4 15h22M4 23h22"></path></svg></button><a class="navbar__brand" href="/zh/"><div class="navbar__logo"><img src="/zh/img/Apache_RocketMQ_logo.svg.png" alt="My Site Logo" class="themedImage_ToTc themedImage--light_HNdA"><img src="/zh/img/Apache_RocketMQ_logo.svg.png" alt="My Site Logo" class="themedImage_ToTc themedImage--dark_i4oU"></div><b class="navbar__title text--truncate">Apache RocketMQ</b></a></div><div class="navbar__items navbar__items--right"><a href="https://github.com/apache/rocketmq" target="_blank" rel="noopener noreferrer" class="navbar__item navbar__link">GitHub<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a><div class="navbar__item dropdown dropdown--hoverable dropdown--right"><a href="#" aria-haspopup="true" aria-expanded="false" role="button" class="navbar__link"><svg viewBox="0 0 24 24" width="20" height="20" aria-hidden="true" class="iconLanguage_nlXk"><path fill="currentColor" d="M12.87 15.07l-2.54-2.51.03-.03c1.74-1.94 2.98-4.17 3.71-6.53H17V4h-7V2H8v2H1v1.99h11.17C11.5 7.92 10.44 9.75 9 11.35 8.07 10.32 7.3 9.19 6.69 8h-2c.73 1.63 1.73 3.17 2.98 4.56l-5.09 5.02L4 19l5-5 3.11 3.11.76-2.04zM18.5 10h-2L12 22h2l1.12-3h4.75L21 22h2l-4.5-12zm-2.62 7l1.62-4.33L19.12 17h-3.24z"></path></svg>简体中文</a><ul class="dropdown__menu"><li><a href="/zh/docs/4.x/consumer/02push" target="_self" rel="noopener noreferrer" class="dropdown__link dropdown__link--active" lang="zh">简体中文</a></li><li><a href="/docs/4.x/consumer/02push" target="_self" rel="noopener noreferrer" class="dropdown__link" lang="en">English</a></li></ul></div><div class="navbar__item dropdown dropdown--hoverable dropdown--right"><a href="#" aria-haspopup="true" aria-expanded="false" role="button" class="navbar__link">文档</a><ul class="dropdown__menu"><li><a aria-current="page" class="dropdown__link dropdown__link--active" href="/zh/docs/">5.0</a></li><li><a aria-current="page" class="dropdown__link dropdown__link--active" href="/zh/docs/4.x/">4.x</a></li></ul></div><a class="navbar__item navbar__link" href="/zh/download">下载</a><div class="navbar__item dropdown dropdown--hoverable dropdown--right"><a href="#" aria-haspopup="true" aria-expanded="false" role="button" class="navbar__link">博客</a><ul class="dropdown__menu"><li><a class="dropdown__link" href="/zh/blog">用户案例</a></li><li><a class="dropdown__link" href="/zh/events">社区活动</a></li><li><a class="dropdown__link" href="/zh/release-notes">版本变化</a></li><li><a class="dropdown__link" href="/zh/news">RocketMQ新闻</a></li></ul></div><div class="navbar__item dropdown dropdown--hoverable dropdown--right"><a href="#" aria-haspopup="true" aria-expanded="false" role="button" class="navbar__link">社区</a><ul class="dropdown__menu"><li><a class="dropdown__link" href="/zh/contact">参与社区</a></li><li><a class="dropdown__link" href="/zh/origin">项目起源</a></li><li><a class="dropdown__link" href="/zh/team">贡献团队</a></li><li><a class="dropdown__link" href="/zh/docs/contributionGuide/01how-to-contribute">贡献说明</a></li><li><a class="dropdown__link" href="/zh/enterprise">企业用户</a></li></ul></div><div class="toggle_vylO colorModeToggle_DEke"><button class="clean-btn toggleButton_gllP toggleButtonDisabled_aARS" type="button" disabled="" title="切换浅色/暗黑模式(当前为浅色模式)" aria-label="切换浅色/暗黑模式(当前为浅色模式)" aria-live="polite"><svg viewBox="0 0 24 24" width="24" height="24" class="lightToggleIcon_pyhR"><path fill="currentColor" d="M12,9c1.65,0,3,1.35,3,3s-1.35,3-3,3s-3-1.35-3-3S10.35,9,12,9 M12,7c-2.76,0-5,2.24-5,5s2.24,5,5,5s5-2.24,5-5 S14.76,7,12,7L12,7z M2,13l2,0c0.55,0,1-0.45,1-1s-0.45-1-1-1l-2,0c-0.55,0-1,0.45-1,1S1.45,13,2,13z M20,13l2,0c0.55,0,1-0.45,1-1 s-0.45-1-1-1l-2,0c-0.55,0-1,0.45-1,1S19.45,13,20,13z M11,2v2c0,0.55,0.45,1,1,1s1-0.45,1-1V2c0-0.55-0.45-1-1-1S11,1.45,11,2z M11,20v2c0,0.55,0.45,1,1,1s1-0.45,1-1v-2c0-0.55-0.45-1-1-1C11.45,19,11,19.45,11,20z M5.99,4.58c-0.39-0.39-1.03-0.39-1.41,0 c-0.39,0.39-0.39,1.03,0,1.41l1.06,1.06c0.39,0.39,1.03,0.39,1.41,0s0.39-1.03,0-1.41L5.99,4.58z M18.36,16.95 c-0.39-0.39-1.03-0.39-1.41,0c-0.39,0.39-0.39,1.03,0,1.41l1.06,1.06c0.39,0.39,1.03,0.39,1.41,0c0.39-0.39,0.39-1.03,0-1.41 L18.36,16.95z M19.42,5.99c0.39-0.39,0.39-1.03,0-1.41c-0.39-0.39-1.03-0.39-1.41,0l-1.06,1.06c-0.39,0.39-0.39,1.03,0,1.41 s1.03,0.39,1.41,0L19.42,5.99z M7.05,18.36c0.39-0.39,0.39-1.03,0-1.41c-0.39-0.39-1.03-0.39-1.41,0l-1.06,1.06 c-0.39,0.39-0.39,1.03,0,1.41s1.03,0.39,1.41,0L7.05,18.36z"></path></svg><svg viewBox="0 0 24 24" width="24" height="24" class="darkToggleIcon_wfgR"><path fill="currentColor" d="M9.37,5.51C9.19,6.15,9.1,6.82,9.1,7.5c0,4.08,3.32,7.4,7.4,7.4c0.68,0,1.35-0.09,1.99-0.27C17.45,17.19,14.93,19,12,19 c-3.86,0-7-3.14-7-7C5,9.07,6.81,6.55,9.37,5.51z M12,3c-4.97,0-9,4.03-9,9s4.03,9,9,9s9-4.03,9-9c0-0.46-0.04-0.92-0.1-1.36 c-0.98,1.37-2.58,2.26-4.4,2.26c-2.98,0-5.4-2.42-5.4-5.4c0-1.81,0.89-3.42,2.26-4.4C12.92,3.04,12.46,3,12,3L12,3z"></path></svg></button></div><div class="searchBox_ZlJk"><button type="button" class="DocSearch DocSearch-Button" aria-label="搜索"><span class="DocSearch-Button-Container"><svg width="20" height="20" class="DocSearch-Search-Icon" viewBox="0 0 20 20" aria-hidden="true"><path d="M14.386 14.386l4.0877 4.0877-4.0877-4.0877c-2.9418 2.9419-7.7115 2.9419-10.6533 0-2.9419-2.9418-2.9419-7.7115 0-10.6533 2.9418-2.9419 7.7115-2.9419 10.6533 0 2.9419 2.9418 2.9419 7.7115 0 10.6533z" stroke="currentColor" fill="none" fill-rule="evenodd" stroke-linecap="round" stroke-linejoin="round"></path></svg><span class="DocSearch-Button-Placeholder">搜索</span></span><span class="DocSearch-Button-Keys"></span></button></div></div></div><div role="presentation" class="navbar-sidebar__backdrop"></div></nav><div id="__docusaurus_skipToContent_fallback" class="main-wrapper mainWrapper_z2l0 docsWrapper_BCFX"><button aria-label="回到顶部" class="clean-btn theme-back-to-top-button backToTopButton_sjWU" type="button"></button><div class="docPage__5DB"><aside class="theme-doc-sidebar-container docSidebarContainer_b6E3"><div class="sidebarViewport_Xe31"><div class="sidebar_njMd"><nav aria-label="文档侧边栏" class="menu thin-scrollbar menu_SIkG"><ul class="theme-doc-sidebar-menu menu__list"><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh/docs/4.x/">基本概念</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh/docs/4.x/producer/01concept1">生产者</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret menu__link--active" aria-expanded="true" href="/zh/docs/4.x/consumer/01concept2">消费者</a></div><ul style="display:block;overflow:visible;height:auto" class="menu__list"><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-2 menu__list-item"><a class="menu__link" tabindex="0" href="/zh/docs/4.x/consumer/01concept2">基础概念</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-2 menu__list-item"><a class="menu__link menu__link--active" aria-current="page" tabindex="0" href="/zh/docs/4.x/consumer/02push">Push消费</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-2 menu__list-item"><a class="menu__link" tabindex="0" href="/zh/docs/4.x/consumer/03pull">Pull消费</a></li></ul></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh/docs/4.x/deployment/01deploy">部署 & 运维</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh/docs/4.x/bestPractice/01bestpractice">最佳实践</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh/docs/4.x/parameterConfiguration/01local">参数配置</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh/docs/4.x/eventbridge/01RocketMQEventBridgeConcepts">RocketMQ EventBridge</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh/docs/4.x/mqtt/01RocketMQMQTTOverview">RocketMQ MQTT</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh/docs/4.x/streams/01RocketMQ Streams Overview">RocketMQ Streams</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh/docs/4.x/connect/01RocketMQ Connect Overview">RocketMQ Connect</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/zh/docs/4.x/contributionGuide/01how-to-contribute">贡献指南</a></div></li></ul></nav></div></div></aside><main class="docMainContainer_gTbr"><div class="container padding-top--md padding-bottom--lg"><div class="row"><div class="col docItemCol_VOVn"><div class="docItemContainer_Djhp"><article><nav class="theme-doc-breadcrumbs breadcrumbsContainer_Z_bl" aria-label="页面路径"><ul class="breadcrumbs" itemscope="" itemtype="https://schema.org/BreadcrumbList"><li class="breadcrumbs__item"><a aria-label="主页面" class="breadcrumbs__link" href="/zh/"><svg viewBox="0 0 24 24" class="breadcrumbHomeIcon_YNFT"><path d="M10 19v-5h4v5c0 .55.45 1 1 1h3c.55 0 1-.45 1-1v-7h1.7c.46 0 .68-.57.33-.87L12.67 3.6c-.38-.34-.96-.34-1.34 0l-8.36 7.53c-.34.3-.13.87.33.87H5v7c0 .55.45 1 1 1h3c.55 0 1-.45 1-1z" fill="currentColor"></path></svg></a></li><li class="breadcrumbs__item"><span class="breadcrumbs__link">消费者</span><meta itemprop="position" content="1"></li><li itemscope="" itemprop="itemListElement" itemtype="https://schema.org/ListItem" class="breadcrumbs__item breadcrumbs__item--active"><span class="breadcrumbs__link" itemprop="name">Push消费</span><meta itemprop="position" content="2"></li></ul></nav><span class="theme-doc-version-badge badge badge--secondary">版本:4.x</span><div class="tocCollapsible_ETCw theme-doc-toc-mobile tocMobile_ITEo"><button type="button" class="clean-btn tocCollapsibleButton_TO0P">本页总览</button></div><div class="theme-doc-markdown markdown"><h1>Push消费</h1><p>RocketMQ Push消费的示例代码如下</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#393A34;--prism-background-color:#f6f8fa"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#393A34"><span class="token plain">public class Consumer {</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> public static void main(String[] args) throws InterruptedException, MQClientException {</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> // 初始化consumer,并设置consumer group name</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> </span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> // 设置NameServer地址 </span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> consumer.setNamesrvAddr("localhost:9876");</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> //订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> consumer.subscribe("TopicTest", "*");</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> //注册回调接口来处理从Broker中收到的消息</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> consumer.registerMessageListener(new MessageListenerConcurrently() {</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> @Override</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> // 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS为消费成功</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> });</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> // 启动Consumer</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> consumer.start();</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> System.out.printf("Consumer Started.%n");</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain">}</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="复制代码到剪贴板" title="复制" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>首先需要初始化消费者,初始化消费者时,必须填写ConsumerGroupName,同一个消费组的ConsumerGroupName是相同的,这是判断消费者是否属于同一个消费组的重要属性。然后是设置NameServer地址,这里与Producer一样不再介绍。然后是调用subscribe方法订阅Topic,subscribe方法需要指定需要订阅的Topic名,也可以增加消息过滤的条件,比如TagA等,上述代码中指定*表示接收所有tag的消息。除了订阅之外,还需要注册回调接口编写消费逻辑来处理从Broker中收到的消息,调用registerMessageListener方法,需要传入MessageListener的实现,上述代码中是并发消费,因此是MessageListenerConcurrently的实现,其接口如下</p><div class="theme-admonition theme-admonition-note alert alert--secondary admonition_LlT9"><div class="admonitionHeading_tbUL"><span class="admonitionIcon_kALy"><svg viewBox="0 0 14 16"><path fill-rule="evenodd" d="M6.3 5.69a.942.942 0 0 1-.28-.7c0-.28.09-.52.28-.7.19-.18.42-.28.7-.28.28 0 .52.09.7.28.18.19.28.42.28.7 0 .28-.09.52-.28.7a1 1 0 0 1-.7.3c-.28 0-.52-.11-.7-.3zM8 7.99c-.02-.25-.11-.48-.31-.69-.2-.19-.42-.3-.69-.31H6c-.27.02-.48.13-.69.31-.2.2-.3.44-.31.69h1v3c.02.27.11.5.31.69.2.2.42.31.69.31h1c.27 0 .48-.11.69-.31.2-.19.3-.42.31-.69H8V7.98v.01zM7 2.3c-3.14 0-5.7 2.54-5.7 5.68 0 3.14 2.56 5.7 5.7 5.7s5.7-2.55 5.7-5.7c0-3.15-2.56-5.69-5.7-5.69v.01zM7 .98c3.86 0 7 3.14 7 7s-3.14 7-7 7-7-3.12-7-7 3.14-7 7-7z"></path></svg></span>MessageListenerConcurrently 接口</div><div class="admonitionContent_S0QG"><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#393A34;--prism-background-color:#f6f8fa"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#393A34"><span class="token plain">/**</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> * A MessageListenerConcurrently object is used to receive asynchronously delivered messages concurrently</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> */</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain">public interface MessageListenerConcurrently extends MessageListener {</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> /**</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> * It is not recommend to throw exception,rather than returning ConsumeConcurrentlyStatus.RECONSUME_LATER if</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> * consumption failure</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> *</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> * @param msgs msgs.size() >= 1<br> DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> * @return The consume status</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> */</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> final ConsumeConcurrentlyContext context);</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain">}</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="复制代码到剪贴板" title="复制" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div></div></div><p>其中,msgs是从Broker端获取的需要被消费消息列表,用户实现该接口,并把自己对消息的消费逻辑写在consumeMessage方法中,然后返回消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS表示消费成功,或者表示RECONSUME_LATER表示消费失败,一段时间后再重新消费。</p><p>可以看到RocketMQ提供的消费者API却非常简单,用户并不需要关注重平衡或者拉取的逻辑,只需要写好自己的消费逻辑即可。</p><h2 class="anchor anchorWithStickyNavbar_LWe7" id="集群模式和广播模式">集群模式和广播模式<a href="#集群模式和广播模式" class="hash-link" aria-label="集群模式和广播模式的直接链接" title="集群模式和广播模式的直接链接"></a></h2><p>我们可以通过以下代码来设置采用集群模式,RocketMQ Push Consumer默认为集群模式,同一个消费组内的消费者分担消费。</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#393A34;--prism-background-color:#f6f8fa"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#393A34"><span class="token plain">consumer.setMessageModel(MessageModel.CLUSTERING);</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="复制代码到剪贴板" title="复制" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>通过以下代码来设置采用广播模式,广播模式下,消费组内的每一个消费者都会消费全量消息。</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#393A34;--prism-background-color:#f6f8fa"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#393A34"><span class="token plain">consumer.setMessageModel(MessageModel.BROADCASTING);</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="复制代码到剪贴板" title="复制" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><h2 class="anchor anchorWithStickyNavbar_LWe7" id="并发消费和顺序消费">并发消费和顺序消费<a href="#并发消费和顺序消费" class="hash-link" aria-label="并发消费和顺序消费的直接链接" title="并发消费和顺序消费的直接链接"></a></h2><p>上面已经介绍设置Push Consumer并发消费的方法,通过在注册消费回调接口时传入MessageListenerConcurrently接口的实现来完成。在并发消费中,可能会有多个线程同时消费一个队列的消息,因此即使发送端通过发送顺序消息保证消息在同一个队列中按照FIFO的顺序,也无法保证消息实际被顺序消费。</p><p>因此RocketMQ提供了顺序消费的方式, 顺序消费设置与并发消费API层面只有一处不同,在注册消费回调接口时传入MessageListenerOrderly接口的实现。</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#393A34;--prism-background-color:#f6f8fa"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#393A34"><span class="token plain">consumer.registerMessageListener(new MessageListenerOrderly() {</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> AtomicLong consumeTimes = new AtomicLong(0);</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> @Override</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> this.consumeTimes.incrementAndGet();</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> if ((this.consumeTimes.get() % 2) == 0) {</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> return ConsumeOrderlyStatus.SUCCESS;</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> } else if ((this.consumeTimes.get() % 5) == 0) {</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> context.setSuspendCurrentQueueTimeMillis(3000);</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> return ConsumeOrderlyStatus.SUCCESS;</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> });</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="复制代码到剪贴板" title="复制" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>顺序消费也有两种返回结果,ConsumeOrderlyStatus.SUCCESS表示消费成功,ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT表示消费失败。</p><h2 class="anchor anchorWithStickyNavbar_LWe7" id="消息过滤">消息过滤<a href="#消息过滤" class="hash-link" aria-label="消息过滤的直接链接" title="消息过滤的直接链接"></a></h2><p>消息过滤是指消息生产者向Topic中发送消息时,设置消息属性对消息进行分类,消费者订阅Topic时,根据消息属性设置过滤条件对消息进行过滤,只有符合过滤条件的消息才会被投递到消费端进行消费。</p><p>消费者订阅Topic时若未设置过滤条件,无论消息发送时是否有设置过滤属性,Topic中的所有消息都将被投递到消费端进行消费。</p><p>RocketMQ支持的消息过滤方式有两种,Tag过滤和SQL92过滤。</p><table><thead><tr><th>过滤方式</th><th>说明</th><th>场景</th></tr></thead><tbody><tr><td>Tag过滤</td><td>消费者订阅的Tag和发送者设置的消息Tag相互匹配,则消息被投递给消费端进行消费。</td><td>简单过滤场景。一条消息支持设置一个Tag,仅需要对Topic中的消息进行一级分类并过滤时可以使用此方式。</td></tr><tr><td>SQL92过滤</td><td>发送者设置Tag或消息属性,消费者订阅满足SQL92过滤表达式的消息被投递给消费端进行消费。</td><td>复杂过滤场景。一条消息支持设置多个属性,可根据SQL语法自定义组合多种类型的表达式对消息进行多级分类并实现多维度的过滤。</td></tr></tbody></table><h3 class="anchor anchorWithStickyNavbar_LWe7" id="tag过滤">Tag过滤<a href="#tag过滤" class="hash-link" aria-label="Tag过滤的直接链接" title="Tag过滤的直接链接"></a></h3><p>Tag在生产者章节已经介绍过,用于对某个Topic下的消息进行分类。生产者在发送消息时,指定消息的Tag,消费者需根据已经指定的Tag来进行订阅。</p><p>以下图电商交易场景为例,从客户下单到收到商品这一过程会生产一系列消息,以如下消息为例:</p><ul><li>订单消息</li><li>支付消息</li><li>物流消息</li></ul><p>这些消息会发送到名称为Trade_Topic的Topic中,被各个不同的系统所订阅,以如下系统为例:</p><ul><li>支付系统:只需订阅支付消息。</li><li>物流系统:只需订阅物流消息。</li><li>实时计算系统:需要订阅所有和交易相关的消息。</li><li>交易成功率分析系统:需订阅订单和支付消息。</li></ul><p>过滤示意图如下所示</p><p><img loading="lazy" alt="Tag过滤" src="/zh/assets/images/Tag过滤-844cfe6dd033746c7134bde843021ad6.png" width="2009" height="1320" class="img_ev3q"></p><p>对于物流系统和支付系统来说,它们都只订阅单个Tag,此时只需要在调用subcribe接口时明确标明Tag即可。</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#393A34;--prism-background-color:#f6f8fa"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#393A34"><span class="token plain">consumer.subscribe("TagFilterTest", "TagA");</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="复制代码到剪贴板" title="复制" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>对于实时计算系统来说,它订阅交易Topic下所有的消息,Tag用星号(*)表示即可。</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#393A34;--prism-background-color:#f6f8fa"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#393A34"><span class="token plain">consumer.subscribe("TagFilterTest", "*");</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="复制代码到剪贴板" title="复制" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>对于交易成功率分析系统来说,它订阅了订单和支付两个Tag的消息,在多个Tag之间用两个竖线(||)分隔即可。</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#393A34;--prism-background-color:#f6f8fa"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#393A34"><span class="token plain">consumer.subscribe("TagFilterTest", "TagA||TagB");</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="复制代码到剪贴板" title="复制" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>这里需要注意的是,如果同一个消费者多次订阅某个Topic下的Tag,以最后一次订阅为准。</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#393A34;--prism-background-color:#f6f8fa"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#393A34"><span class="token plain">//如下错误代码中,Consumer只能订阅到TagFilterTest下TagB的消息,而不能订阅TagA的消息。</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain">consumer.subscribe("TagFilterTest", "TagA");</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain">consumer.subscribe("TagFilterTest", "TagB");</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="复制代码到剪贴板" title="复制" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><h3 class="anchor anchorWithStickyNavbar_LWe7" id="sql92过滤">SQL92过滤<a href="#sql92过滤" class="hash-link" aria-label="SQL92过滤的直接链接" title="SQL92过滤的直接链接"></a></h3><p>SQL92过滤是在消息发送时设置消息的Tag或自定义属性,消费者订阅时使用SQL语法设置过滤表达式,根据自定义属性或Tag过滤消息。</p><blockquote><p>Tag属于一种特殊的消息属性,在SQL语法中,Tag的属性值为TAGS。 |
| 使用SQL语法设置过滤表达式,并根据自定义属性过滤消息。</li></ul><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#393A34;--prism-background-color:#f6f8fa"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#393A34"><span class="token plain">consumer.subscribe("SqlFilterTest",</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> "and (a is not null and a between 0 and 3)"));</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="复制代码到剪贴板" title="复制" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><h2 class="anchor anchorWithStickyNavbar_LWe7" id="消息重试和死信队列">消息重试和死信队列<a href="#消息重试和死信队列" class="hash-link" aria-label="消息重试和死信队列的直接链接" title="消息重试和死信队列的直接链接"></a></h2><h3 class="anchor anchorWithStickyNavbar_LWe7" id="消息重试">消息重试<a href="#消息重试" class="hash-link" aria-label="消息重试的直接链接" title="消息重试的直接链接"></a></h3><p>若Consumer消费某条消息失败,则RocketMQ会在重试间隔时间后,将消息重新投递给Consumer消费,若达到最大重试次数后消息还没有成功被消费,则消息将被投递至死信队列</p><blockquote><p>消息重试只针对集群消费模式生效;广播消费模式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息</p><ul><li>最大重试次数:消息消费失败后,可被重复投递的最大次数。</li></ul></blockquote><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#393A34;--prism-background-color:#f6f8fa"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#393A34"><span class="token plain">consumer.setMaxReconsumeTimes(10);</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="复制代码到剪贴板" title="复制" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><ul><li><p>重试间隔:消息消费失败后再次被投递给Consumer消费的间隔时间,只在顺序消费中起作用。</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#393A34;--prism-background-color:#f6f8fa"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#393A34"><span class="token plain">consumer.setSuspendCurrentQueueTimeMillis(5000);</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="复制代码到剪贴板" title="复制" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div></li></ul><p>顺序消费和并发消费的重试机制并不相同,顺序消费消费失败后会先在客户端本地重试直到最大重试次数,这样可以避免消费失败的消息被跳过,消费下一条消息而打乱顺序消费的顺序,而并发消费消费失败后会将消费失败的消息重新投递回服务端,再等待服务端重新投递回来,在这期间会正常消费队列后面的消息。</p><blockquote><p>并发消费失败后并不是投递回原Topic,而是投递到一个特殊Topic,其命名为%RETRY%ConsumerGroupName,集群模式下并发消费每一个ConsumerGroup会对应一个特殊Topic,并会订阅该Topic。 |