| <div role="region" aria-label="Skip to main content"><a class="skipToContent_fXgn" href="#__docusaurus_skipToContent_fallback">Skip to main content</a></div><nav aria-label="Main" class="navbar navbar--fixed-top"><div class="navbar__inner"><div class="navbar__items"><button aria-label="Toggle navigation bar" aria-expanded="false" class="navbar__toggle clean-btn" type="button"><svg width="30" height="30" viewBox="0 0 30 30" aria-hidden="true"><path stroke="currentColor" stroke-linecap="round" stroke-miterlimit="10" stroke-width="2" d="M4 7h22M4 15h22M4 23h22"></path></svg></button><a class="navbar__brand" href="/"><div class="navbar__logo"><img src="/img/Apache_RocketMQ_logo.svg.png" alt="My Site Logo" class="themedImage_ToTc themedImage--light_HNdA"><img src="/img/Apache_RocketMQ_logo.svg.png" alt="My Site Logo" class="themedImage_ToTc themedImage--dark_i4oU"></div><b class="navbar__title text--truncate">Apache RocketMQ</b></a></div><div class="navbar__items navbar__items--right"><a href="https://github.com/apache/rocketmq" target="_blank" rel="noopener noreferrer" class="navbar__item navbar__link">GitHub<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a><div class="navbar__item dropdown dropdown--hoverable dropdown--right"><a href="#" aria-haspopup="true" aria-expanded="false" role="button" class="navbar__link"><svg viewBox="0 0 24 24" width="20" height="20" aria-hidden="true" class="iconLanguage_nlXk"><path fill="currentColor" d="M12.87 15.07l-2.54-2.51.03-.03c1.74-1.94 2.98-4.17 3.71-6.53H17V4h-7V2H8v2H1v1.99h11.17C11.5 7.92 10.44 9.75 9 11.35 8.07 10.32 7.3 9.19 6.69 8h-2c.73 1.63 1.73 3.17 2.98 4.56l-5.09 5.02L4 19l5-5 3.11 3.11.76-2.04zM18.5 10h-2L12 22h2l1.12-3h4.75L21 22h2l-4.5-12zm-2.62 7l1.62-4.33L19.12 17h-3.24z"></path></svg>English</a><ul class="dropdown__menu"><li><a href="/zh/docs/4.x/consumer/02push" target="_self" rel="noopener noreferrer" class="dropdown__link" lang="zh">简体中文</a></li><li><a href="/docs/4.x/consumer/02push" target="_self" rel="noopener noreferrer" class="dropdown__link dropdown__link--active" lang="en">English</a></li></ul></div><div class="navbar__item dropdown dropdown--hoverable dropdown--right"><a href="#" aria-haspopup="true" aria-expanded="false" role="button" class="navbar__link">Docs</a><ul class="dropdown__menu"><li><a aria-current="page" class="dropdown__link dropdown__link--active" href="/docs/">5.0</a></li><li><a aria-current="page" class="dropdown__link dropdown__link--active" href="/docs/4.x/">4.x</a></li></ul></div><a class="navbar__item navbar__link" href="/download">Download</a><div class="navbar__item dropdown dropdown--hoverable dropdown--right"><a href="#" aria-haspopup="true" aria-expanded="false" role="button" class="navbar__link">Blog</a><ul class="dropdown__menu"><li><a class="dropdown__link" href="/blog">User Cases</a></li><li><a class="dropdown__link" href="/events">Activity</a></li><li><a class="dropdown__link" href="/release-notes">Change Log</a></li><li><a class="dropdown__link" href="/news">RocketMQ News</a></li></ul></div><div class="navbar__item dropdown dropdown--hoverable dropdown--right"><a href="#" aria-haspopup="true" aria-expanded="false" role="button" class="navbar__link">Community</a><ul class="dropdown__menu"><li><a class="dropdown__link" href="/contact">Join Community</a></li><li><a class="dropdown__link" href="/origin">Origin</a></li><li><a class="dropdown__link" href="/team">Teams</a></li><li><a class="dropdown__link" href="/docs/contributionGuide/01how-to-contribute">Contributions</a></li><li><a class="dropdown__link" href="/enterprise">Enterprises</a></li></ul></div><div class="toggle_vylO colorModeToggle_DEke"><button class="clean-btn toggleButton_gllP toggleButtonDisabled_aARS" type="button" disabled="" title="Switch between dark and light mode (currently light mode)" aria-label="Switch between dark and light mode (currently light mode)" aria-live="polite"><svg viewBox="0 0 24 24" width="24" height="24" class="lightToggleIcon_pyhR"><path fill="currentColor" d="M12,9c1.65,0,3,1.35,3,3s-1.35,3-3,3s-3-1.35-3-3S10.35,9,12,9 M12,7c-2.76,0-5,2.24-5,5s2.24,5,5,5s5-2.24,5-5 S14.76,7,12,7L12,7z M2,13l2,0c0.55,0,1-0.45,1-1s-0.45-1-1-1l-2,0c-0.55,0-1,0.45-1,1S1.45,13,2,13z M20,13l2,0c0.55,0,1-0.45,1-1 s-0.45-1-1-1l-2,0c-0.55,0-1,0.45-1,1S19.45,13,20,13z M11,2v2c0,0.55,0.45,1,1,1s1-0.45,1-1V2c0-0.55-0.45-1-1-1S11,1.45,11,2z M11,20v2c0,0.55,0.45,1,1,1s1-0.45,1-1v-2c0-0.55-0.45-1-1-1C11.45,19,11,19.45,11,20z M5.99,4.58c-0.39-0.39-1.03-0.39-1.41,0 c-0.39,0.39-0.39,1.03,0,1.41l1.06,1.06c0.39,0.39,1.03,0.39,1.41,0s0.39-1.03,0-1.41L5.99,4.58z M18.36,16.95 c-0.39-0.39-1.03-0.39-1.41,0c-0.39,0.39-0.39,1.03,0,1.41l1.06,1.06c0.39,0.39,1.03,0.39,1.41,0c0.39-0.39,0.39-1.03,0-1.41 L18.36,16.95z M19.42,5.99c0.39-0.39,0.39-1.03,0-1.41c-0.39-0.39-1.03-0.39-1.41,0l-1.06,1.06c-0.39,0.39-0.39,1.03,0,1.41 s1.03,0.39,1.41,0L19.42,5.99z M7.05,18.36c0.39-0.39,0.39-1.03,0-1.41c-0.39-0.39-1.03-0.39-1.41,0l-1.06,1.06 c-0.39,0.39-0.39,1.03,0,1.41s1.03,0.39,1.41,0L7.05,18.36z"></path></svg><svg viewBox="0 0 24 24" width="24" height="24" class="darkToggleIcon_wfgR"><path fill="currentColor" d="M9.37,5.51C9.19,6.15,9.1,6.82,9.1,7.5c0,4.08,3.32,7.4,7.4,7.4c0.68,0,1.35-0.09,1.99-0.27C17.45,17.19,14.93,19,12,19 c-3.86,0-7-3.14-7-7C5,9.07,6.81,6.55,9.37,5.51z M12,3c-4.97,0-9,4.03-9,9s4.03,9,9,9s9-4.03,9-9c0-0.46-0.04-0.92-0.1-1.36 c-0.98,1.37-2.58,2.26-4.4,2.26c-2.98,0-5.4-2.42-5.4-5.4c0-1.81,0.89-3.42,2.26-4.4C12.92,3.04,12.46,3,12,3L12,3z"></path></svg></button></div><div class="searchBox_ZlJk"><button type="button" class="DocSearch DocSearch-Button" aria-label="Search"><span class="DocSearch-Button-Container"><svg width="20" height="20" class="DocSearch-Search-Icon" viewBox="0 0 20 20" aria-hidden="true"><path d="M14.386 14.386l4.0877 4.0877-4.0877-4.0877c-2.9418 2.9419-7.7115 2.9419-10.6533 0-2.9419-2.9418-2.9419-7.7115 0-10.6533 2.9418-2.9419 7.7115-2.9419 10.6533 0 2.9419 2.9418 2.9419 7.7115 0 10.6533z" stroke="currentColor" fill="none" fill-rule="evenodd" stroke-linecap="round" stroke-linejoin="round"></path></svg><span class="DocSearch-Button-Placeholder">Search</span></span><span class="DocSearch-Button-Keys"></span></button></div></div></div><div role="presentation" class="navbar-sidebar__backdrop"></div></nav><div id="__docusaurus_skipToContent_fallback" class="main-wrapper mainWrapper_z2l0 docsWrapper_BCFX"><button aria-label="Scroll back to top" class="clean-btn theme-back-to-top-button backToTopButton_sjWU" type="button"></button><div class="docPage__5DB"><aside class="theme-doc-sidebar-container docSidebarContainer_b6E3"><div class="sidebarViewport_Xe31"><div class="sidebar_njMd"><nav aria-label="Docs sidebar" class="menu thin-scrollbar menu_SIkG"><ul class="theme-doc-sidebar-menu menu__list"><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/docs/4.x/">Introduction</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/docs/4.x/producer/01concept1">Producer</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret menu__link--active" aria-expanded="true" href="/docs/4.x/consumer/01concept2">Consumer</a></div><ul style="display:block;overflow:visible;height:auto" class="menu__list"><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-2 menu__list-item"><a class="menu__link" tabindex="0" href="/docs/4.x/consumer/01concept2">Core Concept</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-2 menu__list-item"><a class="menu__link menu__link--active" aria-current="page" tabindex="0" href="/docs/4.x/consumer/02push">Push Consumer</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-2 menu__list-item"><a class="menu__link" tabindex="0" href="/docs/4.x/consumer/03pull">Pull Consumer</a></li></ul></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/docs/4.x/deployment/01deploy">Deployment & Operations</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/docs/4.x/bestPractice/01bestpractice">Best Practice</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/docs/4.x/parameterConfiguration/01local">Parameter Configuration</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/docs/4.x/eventbridge/01RocketMQEventBridgeConcepts">RocketMQ EventBridge</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/docs/4.x/mqtt/01RocketMQMQTTOverview">RocketMQ MQTT</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/docs/4.x/streams/01RocketMQ Streams Overview">RocketMQ Streams</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/docs/4.x/connect/01RocketMQ Connect Overview">RocketMQ Connect</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/docs/4.x/contributionGuide/01how-to-contribute">Contribution Guide</a></div></li></ul></nav></div></div></aside><main class="docMainContainer_gTbr"><div class="container padding-top--md padding-bottom--lg"><div class="row"><div class="col docItemCol_VOVn"><div class="docItemContainer_Djhp"><article><nav class="theme-doc-breadcrumbs breadcrumbsContainer_Z_bl" aria-label="Breadcrumbs"><ul class="breadcrumbs" itemscope="" itemtype="https://schema.org/BreadcrumbList"><li class="breadcrumbs__item"><a aria-label="Home page" class="breadcrumbs__link" href="/"><svg viewBox="0 0 24 24" class="breadcrumbHomeIcon_YNFT"><path d="M10 19v-5h4v5c0 .55.45 1 1 1h3c.55 0 1-.45 1-1v-7h1.7c.46 0 .68-.57.33-.87L12.67 3.6c-.38-.34-.96-.34-1.34 0l-8.36 7.53c-.34.3-.13.87.33.87H5v7c0 .55.45 1 1 1h3c.55 0 1-.45 1-1z" fill="currentColor"></path></svg></a></li><li class="breadcrumbs__item"><span class="breadcrumbs__link">Consumer</span><meta itemprop="position" content="1"></li><li itemscope="" itemprop="itemListElement" itemtype="https://schema.org/ListItem" class="breadcrumbs__item breadcrumbs__item--active"><span class="breadcrumbs__link" itemprop="name">Push Consumer</span><meta itemprop="position" content="2"></li></ul></nav><span class="theme-doc-version-badge badge badge--secondary">Version: 4.x</span><div class="tocCollapsible_ETCw theme-doc-toc-mobile tocMobile_ITEo"><button type="button" class="clean-btn tocCollapsibleButton_TO0P">On this page</button></div><div class="theme-doc-markdown markdown"><h1>Push Consumer</h1><p>The simple code of RocketMQ Push Consumer is as follows:</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#393A34;--prism-background-color:#f6f8fa"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#393A34"><span class="token plain">public class Consumer {</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> public static void main(String[] args) throws InterruptedException, MQClientException {</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> // Initialize Consumer and set Consumer Goup Name</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> </span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> // Set the address of NameServer </span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> consumer.setNamesrvAddr("localhost:9876");</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> // Subscribe One or more of topics,and specify the tag filtering conditions, here specify * means receive all tag messages</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> consumer.subscribe("TopicTest", "*");</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> // Register a callback interface to handle messages received from the Broker</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> consumer.registerMessageListener(new MessageListenerConcurrently() {</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> @Override</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> // Return to the message consumption status, ConsumeConcurrentlyStatus.CONSUME_SUCCESS for successful consumption</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> });</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> // Start Consumer</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> consumer.start();</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> System.out.printf("Consumer Started.%n");</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain">}</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>First, initialize the consumer. When initializing the consumer, the consumer must with the ConsumerGroupName, the ConsumerGroupName of the same consumer group is the same, which is an important attribute to determine whether the consumer belongs to the same consumer group. Then, set the NameServer address, which is not introduced here as like Producer. Then, call the subscribe method to subscribe to Topic. The subscribe method needs to specify the Topic name needed to subscribe to, it can also add the message filtering conditions, such as TagA, etc. The above code to specify * means to receive all tag messages. In addition to subscribing, it also needs to register the callback interface to write the consumption logic to handle the messages received from the Broker. If call the registerMessageListener method, it needs to pass in the MessageListener implementation. The above code is concurrent consumption, so it is MessageListenerConcurrently implementation, its interface is as follows:</p><div class="theme-admonition theme-admonition-note alert alert--secondary admonition_LlT9"><div class="admonitionHeading_tbUL"><span class="admonitionIcon_kALy"><svg viewBox="0 0 14 16"><path fill-rule="evenodd" d="M6.3 5.69a.942.942 0 0 1-.28-.7c0-.28.09-.52.28-.7.19-.18.42-.28.7-.28.28 0 .52.09.7.28.18.19.28.42.28.7 0 .28-.09.52-.28.7a1 1 0 0 1-.7.3c-.28 0-.52-.11-.7-.3zM8 7.99c-.02-.25-.11-.48-.31-.69-.2-.19-.42-.3-.69-.31H6c-.27.02-.48.13-.69.31-.2.2-.3.44-.31.69h1v3c.02.27.11.5.31.69.2.2.42.31.69.31h1c.27 0 .48-.11.69-.31.2-.19.3-.42.31-.69H8V7.98v.01zM7 2.3c-3.14 0-5.7 2.54-5.7 5.68 0 3.14 2.56 5.7 5.7 5.7s5.7-2.55 5.7-5.7c0-3.15-2.56-5.69-5.7-5.69v.01zM7 .98c3.86 0 7 3.14 7 7s-3.14 7-7 7-7-3.12-7-7 3.14-7 7-7z"></path></svg></span>MessageListenerConcurrently Interface</div><div class="admonitionContent_S0QG"><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#393A34;--prism-background-color:#f6f8fa"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#393A34"><span class="token plain">/**</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> * A MessageListenerConcurrently object is used to receive asynchronously delivered messages concurrently</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> */</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain">public interface MessageListenerConcurrently extends MessageListener {</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> /**</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> * It is not recommend to throw exception,rather than returning ConsumeConcurrentlyStatus.RECONSUME_LATER if</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> * consumption failure</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> *</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> * @param msgs msgs.size() >= 1<br> DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> * @return The consume status</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> */</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> final ConsumeConcurrentlyContext context);</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain">}</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div></div></div><p>where msgs is the list of messages to be consumed obtained from the Broker, and the user implements the interface and writes the consumption logic for the messages in the consumeMessage method, and then returns the consumption status, ConsumeConcurrentlyStatus.CONSUME_SUCCESS indicates successful consumption, or CONSUME_LATER means that the consumption has failed and will be re-consumed after a period of time.</p><p>The RocketMQ provides a very simple consumer API, users don't need to focus on rebalancing or pulling logic, they just need to write their own consumption logic.</p><h2 class="anchor anchorWithStickyNavbar_LWe7" id="cluster-and-broadcast-mode">Cluster and Broadcast Mode<a href="#cluster-and-broadcast-mode" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h2><p>We can set it to use cluster mode by the following code. RocketMQ Push Consumer uses cluster mode by default, where consumers in the same consumer group consume together.</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#393A34;--prism-background-color:#f6f8fa"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#393A34"><span class="token plain">consumer.setMessageModel(MessageModel.CLUSTERING);</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>Set up the use of broadcast mode with the following code. In broadcast mode, each consumer within the consumer group consumes the full messages.</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#393A34;--prism-background-color:#f6f8fa"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#393A34"><span class="token plain">consumer.setMessageModel(MessageModel.BROADCASTING);</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><h2 class="anchor anchorWithStickyNavbar_LWe7" id="concurrent-consumption-and-order-consumption">Concurrent Consumption and Order Consumption<a href="#concurrent-consumption-and-order-consumption" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h2><p>Setting up Push Consumer concurrent consumption has been described above and is accomplished by passing in the implementation of the MessageListenerConcurrently interface when registering the consumption callback interface. In concurrent consumption, there may be multiple threads consuming messages from a queue at the same time, so even if the sender ensures that messages are in the same queue in FIFO order by sending order messages, there is no guarantee that the messages are actually consumed orderly.</p><p>RocketMQ therefore provides a order consumption approach. The only difference between order consumption setup and concurrent consumption at the API level is that the implementation of the MessageListenerOrderly interface is passed in when registering the consumption callback interface.</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#393A34;--prism-background-color:#f6f8fa"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#393A34"><span class="token plain">consumer.registerMessageListener(new MessageListenerOrderly() {</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> AtomicLong consumeTimes = new AtomicLong(0);</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> @Override</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> this.consumeTimes.incrementAndGet();</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> if ((this.consumeTimes.get() % 2) == 0) {</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> return ConsumeOrderlyStatus.SUCCESS;</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> } else if ((this.consumeTimes.get() % 5) == 0) {</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> context.setSuspendCurrentQueueTimeMillis(3000);</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> return ConsumeOrderlyStatus.SUCCESS;</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> });</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>There are also two return results for order consumption, ConsumeOrderlyStatus.SUCCESS for successful consumption and ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT for failed consumption.</p><h2 class="anchor anchorWithStickyNavbar_LWe7" id="message-filtering">Message Filtering<a href="#message-filtering" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h2><p>Message filtering means that message producers set message attributes to classify messages when sending messages to a Topic, and consumers set filtering conditions according to the message attributes when subscribing to a Topic, so that only messages that meet the filtering conditions are delivered to the consumer side for consumption.</p><p>If the consumer subscribes to a Topic without setting filter conditions, all messages in the Topic will be delivered to the consumer for consumption, regardless of whether the filter attributes are set when the message is sent.</p><p>There are two types of message filtering supported by RocketMQ, Tag filtering and SQL92 filtering.</p><table><thead><tr><th>Message Filtering</th><th>Instruct</th><th>Scenario</th></tr></thead><tbody><tr><td>Tag filtering</td><td>If the Tag subscribed by the consumer and the message Tag set by the sender match each other, the message is cast to the consumer for consumption.</td><td>Simple filtering Scenario: a message supports setting one Tag, which can be used when only one level of classification and filtering of messages in Topic is required.</td></tr><tr><td>SQL92 filtering</td><td>The sender sets the Tag or message attribute, and the consumer subscribes to the message that satisfies the SQL92 filter expression is cast to the consumer for consumption.</td><td>Complex filtering Scenarios: a message supports setting multiple attributes and can be customized to combine multiple types of expressions according to SQL syntax to classify messages at multiple levels and achieve multi-dimensional filtering.</td></tr></tbody></table><h3 class="anchor anchorWithStickyNavbar_LWe7" id="tag-filtering">Tag Filtering<a href="#tag-filtering" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h3><p>Tag has been introduced in the Producers chapter and is used to classify messages under a certain Topic. When sending a message, the producer specifies the Tag of the message, and the consumer has to subscribe according to the Tag already specified.</p><p>Take the following e-commerce transaction scenario as an example, the process from the customer's order to the receipt of goods will produce a series of messages, as follows:</p><ul><li>Order News</li><li>Payment News</li><li>Logistics News</li></ul><p>These messages are sent to a Topic with the name Trade_Topic and are subscribed to by various systems, as exemplified by the following:</p><ul><li>Payment system: subscribe to payment messages only.</li><li>Logistics system: subscribe to logistics messages only.</li><li>Real-time calculation system: subscribe to all transaction-related messages.</li><li>Transaction success rate analysis system: subscribe to order and payment messages.</li></ul><p>The filtering schematic is shown below</p><p><img loading="lazy" alt="Tag过滤" src="/assets/images/Tag过滤-844cfe6dd033746c7134bde843021ad6.png" width="2009" height="1320" class="img_ev3q"></p><p>For logistics systems and payment systems, they both subscribe to a single Tag, at which point it is sufficient to mark the Tag when calling the subcribe interface.</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#393A34;--prism-background-color:#f6f8fa"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#393A34"><span class="token plain">consumer.subscribe("TagFilterTest", "TagA");</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>For a real-time computing system, it subscribes to all messages under the transaction Topic, and the Tag is simply indicated by an asterisk (*).</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#393A34;--prism-background-color:#f6f8fa"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#393A34"><span class="token plain">consumer.subscribe("TagFilterTest", "*");</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>For the transaction success rate analysis system, it subscribes to messages for both Order and Payment Tags, and it is fine to separate multiple Tags with two vertical lines (||).</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#393A34;--prism-background-color:#f6f8fa"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#393A34"><span class="token plain">consumer.subscribe("TagFilterTest", "TagA||TagB");</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>It should be noted here that if the same consumer subscribes to a Tag under a Topic multiple times, the last subscription will prevail.</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#393A34;--prism-background-color:#f6f8fa"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#393A34"><span class="token plain">//In the following error code, the Consumer can only subscribe to the message of TagB under TagFilterTest, but not the message of TagA.</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain">consumer.subscribe("TagFilterTest", "TagA");</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain">consumer.subscribe("TagFilterTest", "TagB");</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><h3 class="anchor anchorWithStickyNavbar_LWe7" id="sql92-filtering">SQL92 Filtering<a href="#sql92-filtering" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h3><p>SQL92 filtering is to set the Tag or custom attribute of the message when the message is sent, and the consumer subscribes to set the filter expression using SQL syntax to filter the message based on the custom attribute or Tag.</p><blockquote><p>Tag belongs to a special kind of message property, and the property value of Tag is TAGS in the SQL syntax. |
| Set filter expressions using SQL syntax and filter messages based on custom properties.</li></ul><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#393A34;--prism-background-color:#f6f8fa"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#393A34"><span class="token plain">consumer.subscribe("SqlFilterTest",</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> "and (a is not null and a between 0 and 3)"));</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><h2 class="anchor anchorWithStickyNavbar_LWe7" id="message-retry-and-dead-letter-queue">Message Retry and Dead-Letter Queue<a href="#message-retry-and-dead-letter-queue" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h2><h3 class="anchor anchorWithStickyNavbar_LWe7" id="message-retry">Message Retry<a href="#message-retry" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h3><p>If the Consumer fails to consume a message, RocketMQ will re-pitch the message to the Consumer after the retry interval, and if the message is not successfully consumed after the maximum number of retries, the message will be pitched to the dead message queue.</p><blockquote><p>Message retry is only effective for cluster mode; broadcast mode does not provide the message retry feature. In the broadcast mode, after a failed consumption, the failed message will not be retry and continue to consume new messages.</p></blockquote><ul><li>Maximum number of retries: the maximum number of times a message can be repeatedly delivered after a failed consumption.</li></ul><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#393A34;--prism-background-color:#f6f8fa"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#393A34"><span class="token plain">consumer.setMaxReconsumeTimes(10);</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>Retry interval: the interval after the message consumption fails to be cast to the Consumer again for consumption, which only works in sequential consumption.</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#393A34;--prism-background-color:#f6f8fa"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#393A34"><span class="token plain">consumer.setSuspendCurrentQueueTimeMillis(5000);</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>The retry mechanism of order consumption and concurrent consumption is not the same. After the order consumption fails to consume, it will first retry locally on the client side until the maximum number of retries, so as to avoid the failed messages being skipped and consuming the next message and disrupting the order of order consumption, while the concurrent consumption will re-cast the failed messages back to the server after the failed consumption, and then wait for the server to re-cast them back, during which it will normally consume the messages behind the queue.</p><blockquote><p>When concurrent consumption fails, it is not cast back to the original Topic, but to a special Topic named %RETRY%ConsumerGroupName, and each ConsumerGroup in cluster mode will correspond to a special Topic and will subscribe to that Topic. |