blob: 25263c7736f472466b73eabaf263f2f9d65f890c [file] [log] [blame]
<!doctype html>
<html lang="en" dir="ltr" class="docs-wrapper docs-doc-page docs-version-current plugin-docs plugin-id-default docs-doc-id-consumer/02push" data-has-hydrated="false">
<head>
<meta charset="UTF-8">
<meta name="generator" content="Docusaurus v2.4.3">
<title data-rh="true">Push Consumer | RocketMQ</title><meta data-rh="true" name="viewport" content="width=device-width,initial-scale=1"><meta data-rh="true" name="twitter:card" content="summary_large_image"><meta data-rh="true" property="og:url" content="https://your-docusaurus-test-site.com/docs/4.x/consumer/02push"><meta data-rh="true" name="docusaurus_locale" content="en"><meta data-rh="true" name="docsearch:language" content="en"><meta data-rh="true" name="docusaurus_version" content="current"><meta data-rh="true" name="docusaurus_tag" content="docs-default-current"><meta data-rh="true" name="docsearch:version" content="current"><meta data-rh="true" name="docsearch:docusaurus_tag" content="docs-default-current"><meta data-rh="true" property="og:title" content="Push Consumer | RocketMQ"><meta data-rh="true" name="description" content="The simple code of RocketMQ Push Consumer is as follows:"><meta data-rh="true" property="og:description" content="The simple code of RocketMQ Push Consumer is as follows:"><link data-rh="true" rel="icon" href="/img/favicon.ico"><link data-rh="true" rel="canonical" href="https://your-docusaurus-test-site.com/docs/4.x/consumer/02push"><link data-rh="true" rel="alternate" href="https://your-docusaurus-test-site.com/zh/docs/4.x/consumer/02push" hreflang="zh"><link data-rh="true" rel="alternate" href="https://your-docusaurus-test-site.com/docs/4.x/consumer/02push" hreflang="en"><link data-rh="true" rel="alternate" href="https://your-docusaurus-test-site.com/docs/4.x/consumer/02push" hreflang="x-default"><link data-rh="true" rel="preconnect" href="https://R2IYF7ETH7-dsn.algolia.net" crossorigin="anonymous"><link rel="alternate" type="application/rss+xml" href="/blog/rss.xml" title="RocketMQ RSS Feed">
<link rel="alternate" type="application/atom+xml" href="/blog/atom.xml" title="RocketMQ Atom Feed">
<link rel="search" type="application/opensearchdescription+xml" title="RocketMQ" href="/opensearch.xml">
<script>var _hmt=_hmt||[];!function(){var e=document.createElement("script");e.src="https://hm.baidu.com/hm.js?36428f2b841d08e7405724cbf7f860d2";var t=document.getElementsByTagName("script")[0];t.parentNode.insertBefore(e,t)}()</script>
<link rel="preconnect" href="https://www.google-analytics.com">
<script>window.ga=window.ga||function(){(ga.q=ga.q||[]).push(arguments)},ga.l=+new Date,ga("create","UA-89603173-1","auto"),ga("set","anonymizeIp",!0),ga("send","pageview")</script>
<script async src="https://www.google-analytics.com/analytics.js"></script>
<link rel="alternate" type="application/rss+xml" href="/events/rss.xml" title="RocketMQ RSS Feed">
<link rel="alternate" type="application/atom+xml" href="/events/atom.xml" title="RocketMQ Atom Feed">
<link rel="alternate" type="application/rss+xml" href="/release-notes/rss.xml" title="RocketMQ RSS Feed">
<link rel="alternate" type="application/atom+xml" href="/release-notes/atom.xml" title="RocketMQ Atom Feed">
<link rel="alternate" type="application/rss+xml" href="/news/rss.xml" title="RocketMQ RSS Feed">
<link rel="alternate" type="application/atom+xml" href="/news/atom.xml" title="RocketMQ Atom Feed">
<link rel="stylesheet" href="//g.alicdn.com/mamba/assets/0.0.13/mse-arc-ui.min.css">
<script src="//g.alicdn.com/mamba/assets/0.0.13/mse-arc-ui.min.js"></script><link rel="stylesheet" href="/assets/css/styles.b006b670.css">
<link rel="preload" href="/assets/js/runtime~main.9fb1bb92.js" as="script">
<link rel="preload" href="/assets/js/main.db9ae330.js" as="script">
</head>
<body class="navigation-with-keyboard">
<script>!function(){function t(t){document.documentElement.setAttribute("data-theme",t)}var e=function(){var t=null;try{t=new URLSearchParams(window.location.search).get("docusaurus-theme")}catch(t){}return t}()||function(){var t=null;try{t=localStorage.getItem("theme")}catch(t){}return t}();t(null!==e?e:"light")}()</script><div id="__docusaurus">
<div role="region" aria-label="Skip to main content"><a class="skipToContent_fXgn" href="#__docusaurus_skipToContent_fallback">Skip to main content</a></div><nav aria-label="Main" class="navbar navbar--fixed-top"><div class="navbar__inner"><div class="navbar__items"><button aria-label="Toggle navigation bar" aria-expanded="false" class="navbar__toggle clean-btn" type="button"><svg width="30" height="30" viewBox="0 0 30 30" aria-hidden="true"><path stroke="currentColor" stroke-linecap="round" stroke-miterlimit="10" stroke-width="2" d="M4 7h22M4 15h22M4 23h22"></path></svg></button><a class="navbar__brand" href="/"><div class="navbar__logo"><img src="/img/Apache_RocketMQ_logo.svg.png" alt="My Site Logo" class="themedImage_ToTc themedImage--light_HNdA"><img src="/img/Apache_RocketMQ_logo.svg.png" alt="My Site Logo" class="themedImage_ToTc themedImage--dark_i4oU"></div><b class="navbar__title text--truncate">Apache RocketMQ</b></a></div><div class="navbar__items navbar__items--right"><a href="https://github.com/apache/rocketmq" target="_blank" rel="noopener noreferrer" class="navbar__item navbar__link">GitHub<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a><div class="navbar__item dropdown dropdown--hoverable dropdown--right"><a href="#" aria-haspopup="true" aria-expanded="false" role="button" class="navbar__link"><svg viewBox="0 0 24 24" width="20" height="20" aria-hidden="true" class="iconLanguage_nlXk"><path fill="currentColor" d="M12.87 15.07l-2.54-2.51.03-.03c1.74-1.94 2.98-4.17 3.71-6.53H17V4h-7V2H8v2H1v1.99h11.17C11.5 7.92 10.44 9.75 9 11.35 8.07 10.32 7.3 9.19 6.69 8h-2c.73 1.63 1.73 3.17 2.98 4.56l-5.09 5.02L4 19l5-5 3.11 3.11.76-2.04zM18.5 10h-2L12 22h2l1.12-3h4.75L21 22h2l-4.5-12zm-2.62 7l1.62-4.33L19.12 17h-3.24z"></path></svg>English</a><ul class="dropdown__menu"><li><a href="/zh/docs/4.x/consumer/02push" target="_self" rel="noopener noreferrer" class="dropdown__link" lang="zh">简体中文</a></li><li><a href="/docs/4.x/consumer/02push" target="_self" rel="noopener noreferrer" class="dropdown__link dropdown__link--active" lang="en">English</a></li></ul></div><div class="navbar__item dropdown dropdown--hoverable dropdown--right"><a href="#" aria-haspopup="true" aria-expanded="false" role="button" class="navbar__link">Docs</a><ul class="dropdown__menu"><li><a aria-current="page" class="dropdown__link dropdown__link--active" href="/docs/">5.0</a></li><li><a aria-current="page" class="dropdown__link dropdown__link--active" href="/docs/4.x/">4.x</a></li></ul></div><a class="navbar__item navbar__link" href="/download">Download</a><div class="navbar__item dropdown dropdown--hoverable dropdown--right"><a href="#" aria-haspopup="true" aria-expanded="false" role="button" class="navbar__link">Blog</a><ul class="dropdown__menu"><li><a class="dropdown__link" href="/blog">User Cases</a></li><li><a class="dropdown__link" href="/events">Activity</a></li><li><a class="dropdown__link" href="/release-notes">Change Log</a></li><li><a class="dropdown__link" href="/news">RocketMQ News</a></li></ul></div><div class="navbar__item dropdown dropdown--hoverable dropdown--right"><a href="#" aria-haspopup="true" aria-expanded="false" role="button" class="navbar__link">Community</a><ul class="dropdown__menu"><li><a class="dropdown__link" href="/contact">Join Community</a></li><li><a class="dropdown__link" href="/origin">Origin</a></li><li><a class="dropdown__link" href="/team">Teams</a></li><li><a class="dropdown__link" href="/docs/contributionGuide/01how-to-contribute">Contributions</a></li><li><a class="dropdown__link" href="/enterprise">Enterprises</a></li></ul></div><div class="toggle_vylO colorModeToggle_DEke"><button class="clean-btn toggleButton_gllP toggleButtonDisabled_aARS" type="button" disabled="" title="Switch between dark and light mode (currently light mode)" aria-label="Switch between dark and light mode (currently light mode)" aria-live="polite"><svg viewBox="0 0 24 24" width="24" height="24" class="lightToggleIcon_pyhR"><path fill="currentColor" d="M12,9c1.65,0,3,1.35,3,3s-1.35,3-3,3s-3-1.35-3-3S10.35,9,12,9 M12,7c-2.76,0-5,2.24-5,5s2.24,5,5,5s5-2.24,5-5 S14.76,7,12,7L12,7z M2,13l2,0c0.55,0,1-0.45,1-1s-0.45-1-1-1l-2,0c-0.55,0-1,0.45-1,1S1.45,13,2,13z M20,13l2,0c0.55,0,1-0.45,1-1 s-0.45-1-1-1l-2,0c-0.55,0-1,0.45-1,1S19.45,13,20,13z M11,2v2c0,0.55,0.45,1,1,1s1-0.45,1-1V2c0-0.55-0.45-1-1-1S11,1.45,11,2z M11,20v2c0,0.55,0.45,1,1,1s1-0.45,1-1v-2c0-0.55-0.45-1-1-1C11.45,19,11,19.45,11,20z M5.99,4.58c-0.39-0.39-1.03-0.39-1.41,0 c-0.39,0.39-0.39,1.03,0,1.41l1.06,1.06c0.39,0.39,1.03,0.39,1.41,0s0.39-1.03,0-1.41L5.99,4.58z M18.36,16.95 c-0.39-0.39-1.03-0.39-1.41,0c-0.39,0.39-0.39,1.03,0,1.41l1.06,1.06c0.39,0.39,1.03,0.39,1.41,0c0.39-0.39,0.39-1.03,0-1.41 L18.36,16.95z M19.42,5.99c0.39-0.39,0.39-1.03,0-1.41c-0.39-0.39-1.03-0.39-1.41,0l-1.06,1.06c-0.39,0.39-0.39,1.03,0,1.41 s1.03,0.39,1.41,0L19.42,5.99z M7.05,18.36c0.39-0.39,0.39-1.03,0-1.41c-0.39-0.39-1.03-0.39-1.41,0l-1.06,1.06 c-0.39,0.39-0.39,1.03,0,1.41s1.03,0.39,1.41,0L7.05,18.36z"></path></svg><svg viewBox="0 0 24 24" width="24" height="24" class="darkToggleIcon_wfgR"><path fill="currentColor" d="M9.37,5.51C9.19,6.15,9.1,6.82,9.1,7.5c0,4.08,3.32,7.4,7.4,7.4c0.68,0,1.35-0.09,1.99-0.27C17.45,17.19,14.93,19,12,19 c-3.86,0-7-3.14-7-7C5,9.07,6.81,6.55,9.37,5.51z M12,3c-4.97,0-9,4.03-9,9s4.03,9,9,9s9-4.03,9-9c0-0.46-0.04-0.92-0.1-1.36 c-0.98,1.37-2.58,2.26-4.4,2.26c-2.98,0-5.4-2.42-5.4-5.4c0-1.81,0.89-3.42,2.26-4.4C12.92,3.04,12.46,3,12,3L12,3z"></path></svg></button></div><div class="searchBox_ZlJk"><button type="button" class="DocSearch DocSearch-Button" aria-label="Search"><span class="DocSearch-Button-Container"><svg width="20" height="20" class="DocSearch-Search-Icon" viewBox="0 0 20 20" aria-hidden="true"><path d="M14.386 14.386l4.0877 4.0877-4.0877-4.0877c-2.9418 2.9419-7.7115 2.9419-10.6533 0-2.9419-2.9418-2.9419-7.7115 0-10.6533 2.9418-2.9419 7.7115-2.9419 10.6533 0 2.9419 2.9418 2.9419 7.7115 0 10.6533z" stroke="currentColor" fill="none" fill-rule="evenodd" stroke-linecap="round" stroke-linejoin="round"></path></svg><span class="DocSearch-Button-Placeholder">Search</span></span><span class="DocSearch-Button-Keys"></span></button></div></div></div><div role="presentation" class="navbar-sidebar__backdrop"></div></nav><div id="__docusaurus_skipToContent_fallback" class="main-wrapper mainWrapper_z2l0 docsWrapper_BCFX"><button aria-label="Scroll back to top" class="clean-btn theme-back-to-top-button backToTopButton_sjWU" type="button"></button><div class="docPage__5DB"><aside class="theme-doc-sidebar-container docSidebarContainer_b6E3"><div class="sidebarViewport_Xe31"><div class="sidebar_njMd"><nav aria-label="Docs sidebar" class="menu thin-scrollbar menu_SIkG"><ul class="theme-doc-sidebar-menu menu__list"><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/docs/4.x/">Introduction</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/docs/4.x/producer/01concept1">Producer</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret menu__link--active" aria-expanded="true" href="/docs/4.x/consumer/01concept2">Consumer</a></div><ul style="display:block;overflow:visible;height:auto" class="menu__list"><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-2 menu__list-item"><a class="menu__link" tabindex="0" href="/docs/4.x/consumer/01concept2">Core Concept</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-2 menu__list-item"><a class="menu__link menu__link--active" aria-current="page" tabindex="0" href="/docs/4.x/consumer/02push">Push Consumer</a></li><li class="theme-doc-sidebar-item-link theme-doc-sidebar-item-link-level-2 menu__list-item"><a class="menu__link" tabindex="0" href="/docs/4.x/consumer/03pull">Pull Consumer</a></li></ul></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/docs/4.x/deployment/01deploy">Deployment &amp; Operations</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/docs/4.x/bestPractice/01bestpractice">Best Practice</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/docs/4.x/parameterConfiguration/01local">Parameter Configuration</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/docs/4.x/eventbridge/01RocketMQEventBridgeConcepts">RocketMQ EventBridge</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/docs/4.x/mqtt/01RocketMQMQTTOverview">RocketMQ MQTT</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/docs/4.x/streams/01RocketMQ Streams Overview">RocketMQ Streams</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/docs/4.x/connect/01RocketMQ Connect Overview">RocketMQ Connect</a></div></li><li class="theme-doc-sidebar-item-category theme-doc-sidebar-item-category-level-1 menu__list-item menu__list-item--collapsed"><div class="menu__list-item-collapsible"><a class="menu__link menu__link--sublist menu__link--sublist-caret" aria-expanded="false" href="/docs/4.x/contributionGuide/01how-to-contribute">Contribution Guide</a></div></li></ul></nav></div></div></aside><main class="docMainContainer_gTbr"><div class="container padding-top--md padding-bottom--lg"><div class="row"><div class="col docItemCol_VOVn"><div class="docItemContainer_Djhp"><article><nav class="theme-doc-breadcrumbs breadcrumbsContainer_Z_bl" aria-label="Breadcrumbs"><ul class="breadcrumbs" itemscope="" itemtype="https://schema.org/BreadcrumbList"><li class="breadcrumbs__item"><a aria-label="Home page" class="breadcrumbs__link" href="/"><svg viewBox="0 0 24 24" class="breadcrumbHomeIcon_YNFT"><path d="M10 19v-5h4v5c0 .55.45 1 1 1h3c.55 0 1-.45 1-1v-7h1.7c.46 0 .68-.57.33-.87L12.67 3.6c-.38-.34-.96-.34-1.34 0l-8.36 7.53c-.34.3-.13.87.33.87H5v7c0 .55.45 1 1 1h3c.55 0 1-.45 1-1z" fill="currentColor"></path></svg></a></li><li class="breadcrumbs__item"><span class="breadcrumbs__link">Consumer</span><meta itemprop="position" content="1"></li><li itemscope="" itemprop="itemListElement" itemtype="https://schema.org/ListItem" class="breadcrumbs__item breadcrumbs__item--active"><span class="breadcrumbs__link" itemprop="name">Push Consumer</span><meta itemprop="position" content="2"></li></ul></nav><span class="theme-doc-version-badge badge badge--secondary">Version: 4.x</span><div class="tocCollapsible_ETCw theme-doc-toc-mobile tocMobile_ITEo"><button type="button" class="clean-btn tocCollapsibleButton_TO0P">On this page</button></div><div class="theme-doc-markdown markdown"><h1>Push Consumer</h1><p>The simple code of RocketMQ Push Consumer is as follows:</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#393A34;--prism-background-color:#f6f8fa"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#393A34"><span class="token plain">public class Consumer {</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> public static void main(String[] args) throws InterruptedException, MQClientException {</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> // Initialize Consumer and set Consumer Goup Name</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(&quot;please_rename_unique_group_name&quot;);</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> </span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> // Set the address of NameServer </span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> consumer.setNamesrvAddr(&quot;localhost:9876&quot;);</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> // Subscribe One or more of topics,and specify the tag filtering conditions, here specify * means receive all tag messages</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> consumer.subscribe(&quot;TopicTest&quot;, &quot;*&quot;);</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> // Register a callback interface to handle messages received from the Broker</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> consumer.registerMessageListener(new MessageListenerConcurrently() {</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> @Override</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> public ConsumeConcurrentlyStatus consumeMessage(List&lt;MessageExt&gt; msgs, ConsumeConcurrentlyContext context) {</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> System.out.printf(&quot;%s Receive New Messages: %s %n&quot;, Thread.currentThread().getName(), msgs);</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> // Return to the message consumption status, ConsumeConcurrentlyStatus.CONSUME_SUCCESS for successful consumption</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> });</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> // Start Consumer</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> consumer.start();</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> System.out.printf(&quot;Consumer Started.%n&quot;);</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain">}</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>First, initialize the consumer. When initializing the consumer, the consumer must with the ConsumerGroupName, the ConsumerGroupName of the same consumer group is the same, which is an important attribute to determine whether the consumer belongs to the same consumer group. Then, set the NameServer address, which is not introduced here as like Producer. Then, call the subscribe method to subscribe to Topic. The subscribe method needs to specify the Topic name needed to subscribe to, it can also add the message filtering conditions, such as TagA, etc. The above code to specify * means to receive all tag messages. In addition to subscribing, it also needs to register the callback interface to write the consumption logic to handle the messages received from the Broker. If call the registerMessageListener method, it needs to pass in the MessageListener implementation. The above code is concurrent consumption, so it is MessageListenerConcurrently implementation, its interface is as follows:</p><div class="theme-admonition theme-admonition-note alert alert--secondary admonition_LlT9"><div class="admonitionHeading_tbUL"><span class="admonitionIcon_kALy"><svg viewBox="0 0 14 16"><path fill-rule="evenodd" d="M6.3 5.69a.942.942 0 0 1-.28-.7c0-.28.09-.52.28-.7.19-.18.42-.28.7-.28.28 0 .52.09.7.28.18.19.28.42.28.7 0 .28-.09.52-.28.7a1 1 0 0 1-.7.3c-.28 0-.52-.11-.7-.3zM8 7.99c-.02-.25-.11-.48-.31-.69-.2-.19-.42-.3-.69-.31H6c-.27.02-.48.13-.69.31-.2.2-.3.44-.31.69h1v3c.02.27.11.5.31.69.2.2.42.31.69.31h1c.27 0 .48-.11.69-.31.2-.19.3-.42.31-.69H8V7.98v.01zM7 2.3c-3.14 0-5.7 2.54-5.7 5.68 0 3.14 2.56 5.7 5.7 5.7s5.7-2.55 5.7-5.7c0-3.15-2.56-5.69-5.7-5.69v.01zM7 .98c3.86 0 7 3.14 7 7s-3.14 7-7 7-7-3.12-7-7 3.14-7 7-7z"></path></svg></span>MessageListenerConcurrently Interface</div><div class="admonitionContent_S0QG"><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#393A34;--prism-background-color:#f6f8fa"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#393A34"><span class="token plain">/**</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> * A MessageListenerConcurrently object is used to receive asynchronously delivered messages concurrently</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> */</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain">public interface MessageListenerConcurrently extends MessageListener {</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> /**</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> * It is not recommend to throw exception,rather than returning ConsumeConcurrentlyStatus.RECONSUME_LATER if</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> * consumption failure</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> *</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> * @param msgs msgs.size() &gt;= 1&lt;br&gt; DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> * @return The consume status</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> */</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> ConsumeConcurrentlyStatus consumeMessage(final List&lt;MessageExt&gt; msgs,</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> final ConsumeConcurrentlyContext context);</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain">}</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div></div></div><p>where msgs is the list of messages to be consumed obtained from the Broker, and the user implements the interface and writes the consumption logic for the messages in the consumeMessage method, and then returns the consumption status, ConsumeConcurrentlyStatus.CONSUME_SUCCESS indicates successful consumption, or CONSUME_LATER means that the consumption has failed and will be re-consumed after a period of time.</p><p>The RocketMQ provides a very simple consumer API, users don&#x27;t need to focus on rebalancing or pulling logic, they just need to write their own consumption logic.</p><h2 class="anchor anchorWithStickyNavbar_LWe7" id="cluster-and-broadcast-mode">Cluster and Broadcast Mode<a href="#cluster-and-broadcast-mode" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h2><p>We can set it to use cluster mode by the following code. RocketMQ Push Consumer uses cluster mode by default, where consumers in the same consumer group consume together.</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#393A34;--prism-background-color:#f6f8fa"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#393A34"><span class="token plain">consumer.setMessageModel(MessageModel.CLUSTERING);</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>Set up the use of broadcast mode with the following code. In broadcast mode, each consumer within the consumer group consumes the full messages.</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#393A34;--prism-background-color:#f6f8fa"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#393A34"><span class="token plain">consumer.setMessageModel(MessageModel.BROADCASTING);</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><h2 class="anchor anchorWithStickyNavbar_LWe7" id="concurrent-consumption-and-order-consumption">Concurrent Consumption and Order Consumption<a href="#concurrent-consumption-and-order-consumption" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h2><p>Setting up Push Consumer concurrent consumption has been described above and is accomplished by passing in the implementation of the MessageListenerConcurrently interface when registering the consumption callback interface. In concurrent consumption, there may be multiple threads consuming messages from a queue at the same time, so even if the sender ensures that messages are in the same queue in FIFO order by sending order messages, there is no guarantee that the messages are actually consumed orderly.</p><p>RocketMQ therefore provides a order consumption approach. The only difference between order consumption setup and concurrent consumption at the API level is that the implementation of the MessageListenerOrderly interface is passed in when registering the consumption callback interface.</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#393A34;--prism-background-color:#f6f8fa"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#393A34"><span class="token plain">consumer.registerMessageListener(new MessageListenerOrderly() {</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> AtomicLong consumeTimes = new AtomicLong(0);</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> @Override</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> public ConsumeOrderlyStatus consumeMessage(List&lt;MessageExt&gt; msgs, ConsumeOrderlyContext context) {</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> System.out.printf(&quot;%s Receive New Messages: %s %n&quot;, Thread.currentThread().getName(), msgs);</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> this.consumeTimes.incrementAndGet();</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> if ((this.consumeTimes.get() % 2) == 0) {</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> return ConsumeOrderlyStatus.SUCCESS;</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> } else if ((this.consumeTimes.get() % 5) == 0) {</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> context.setSuspendCurrentQueueTimeMillis(3000);</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> return ConsumeOrderlyStatus.SUCCESS;</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> }</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> });</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>There are also two return results for order consumption, ConsumeOrderlyStatus.SUCCESS for successful consumption and ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT for failed consumption.</p><h2 class="anchor anchorWithStickyNavbar_LWe7" id="message-filtering">Message Filtering<a href="#message-filtering" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h2><p>Message filtering means that message producers set message attributes to classify messages when sending messages to a Topic, and consumers set filtering conditions according to the message attributes when subscribing to a Topic, so that only messages that meet the filtering conditions are delivered to the consumer side for consumption.</p><p>If the consumer subscribes to a Topic without setting filter conditions, all messages in the Topic will be delivered to the consumer for consumption, regardless of whether the filter attributes are set when the message is sent.</p><p>There are two types of message filtering supported by RocketMQ, Tag filtering and SQL92 filtering.</p><table><thead><tr><th>Message Filtering</th><th>Instruct</th><th>Scenario</th></tr></thead><tbody><tr><td>Tag filtering</td><td>If the Tag subscribed by the consumer and the message Tag set by the sender match each other, the message is cast to the consumer for consumption.</td><td>Simple filtering Scenario: a message supports setting one Tag, which can be used when only one level of classification and filtering of messages in Topic is required.</td></tr><tr><td>SQL92 filtering</td><td>The sender sets the Tag or message attribute, and the consumer subscribes to the message that satisfies the SQL92 filter expression is cast to the consumer for consumption.</td><td>Complex filtering Scenarios: a message supports setting multiple attributes and can be customized to combine multiple types of expressions according to SQL syntax to classify messages at multiple levels and achieve multi-dimensional filtering.</td></tr></tbody></table><h3 class="anchor anchorWithStickyNavbar_LWe7" id="tag-filtering">Tag Filtering<a href="#tag-filtering" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h3><p>Tag has been introduced in the Producers chapter and is used to classify messages under a certain Topic. When sending a message, the producer specifies the Tag of the message, and the consumer has to subscribe according to the Tag already specified.</p><p>Take the following e-commerce transaction scenario as an example, the process from the customer&#x27;s order to the receipt of goods will produce a series of messages, as follows:</p><ul><li>Order News</li><li>Payment News</li><li>Logistics News</li></ul><p>These messages are sent to a Topic with the name Trade_Topic and are subscribed to by various systems, as exemplified by the following:</p><ul><li>Payment system: subscribe to payment messages only.</li><li>Logistics system: subscribe to logistics messages only.</li><li>Real-time calculation system: subscribe to all transaction-related messages.</li><li>Transaction success rate analysis system: subscribe to order and payment messages.</li></ul><p>The filtering schematic is shown below</p><p><img loading="lazy" alt="Tag过滤" src="/assets/images/Tag过滤-844cfe6dd033746c7134bde843021ad6.png" width="2009" height="1320" class="img_ev3q"></p><p>For logistics systems and payment systems, they both subscribe to a single Tag, at which point it is sufficient to mark the Tag when calling the subcribe interface.</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#393A34;--prism-background-color:#f6f8fa"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#393A34"><span class="token plain">consumer.subscribe(&quot;TagFilterTest&quot;, &quot;TagA&quot;);</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>For a real-time computing system, it subscribes to all messages under the transaction Topic, and the Tag is simply indicated by an asterisk (*).</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#393A34;--prism-background-color:#f6f8fa"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#393A34"><span class="token plain">consumer.subscribe(&quot;TagFilterTest&quot;, &quot;*&quot;);</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>For the transaction success rate analysis system, it subscribes to messages for both Order and Payment Tags, and it is fine to separate multiple Tags with two vertical lines (||).</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#393A34;--prism-background-color:#f6f8fa"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#393A34"><span class="token plain">consumer.subscribe(&quot;TagFilterTest&quot;, &quot;TagA||TagB&quot;);</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>It should be noted here that if the same consumer subscribes to a Tag under a Topic multiple times, the last subscription will prevail.</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#393A34;--prism-background-color:#f6f8fa"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#393A34"><span class="token plain">//In the following error code, the Consumer can only subscribe to the message of TagB under TagFilterTest, but not the message of TagA.</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain">consumer.subscribe(&quot;TagFilterTest&quot;, &quot;TagA&quot;);</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain">consumer.subscribe(&quot;TagFilterTest&quot;, &quot;TagB&quot;);</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><h3 class="anchor anchorWithStickyNavbar_LWe7" id="sql92-filtering">SQL92 Filtering<a href="#sql92-filtering" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h3><p>SQL92 filtering is to set the Tag or custom attribute of the message when the message is sent, and the consumer subscribes to set the filter expression using SQL syntax to filter the message based on the custom attribute or Tag.</p><blockquote><p>Tag belongs to a special kind of message property, and the property value of Tag is TAGS in the SQL syntax.
Enable property filtering first set the configuration enablePropertyFilter=true on the Broker side, the value is false by default.</p></blockquote><p>Take the following e-commerce transaction scenario as an example, the process from the customer&#x27;s order to the receipt of goods will produce a series of messages, according to the type of messages into order messages and logistics messages, which define the geographical attributes of logistics messages, according to the region into Hangzhou and Shanghai:</p><ul><li>Order News</li><li>Logistics News<ul><li>Logistics information and the region is Hangzhou</li><li>Logistics information and the region is Shanghai</li></ul></li></ul><p>These messages are sent to the Topic with the name Trade_Topic and are subscribed by various systems, as an example, the following system:</p><ul><li>Logistics system 1: only need to subscribe to the logistics message and the message area is Hangzhou.</li><li>Logistics system 2: only need to subscribe to the logistics news and the news area is Hangzhou or Shanghai.</li><li>Order tracking system: only need to subscribe to order information.</li></ul><p>The SQL92 filtering schematic is shown below:</p><p><img loading="lazy" alt="SQL92过滤" src="/assets/images/SQL92过滤-716732acb1aad27fc8e7a9e218ebaa65.png" width="2012" height="1369" class="img_ev3q"></p><p>The locale will be set as a custom property in the message.</p><ul><li>Message sender.
Set the custom properties of the message.</li></ul><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#393A34;--prism-background-color:#f6f8fa"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#393A34"><span class="token plain">Message msg = new Message(&quot;topic&quot;, &quot;tagA&quot;, &quot;Hello MQ&quot;.getBytes());</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain">// Set custom property A with property value 1.</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain">msg.putUserProperties(&quot;a&quot;, &quot;1&quot;);</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><ul><li>Message consumer.
Set filter expressions using SQL syntax and filter messages based on custom properties.</li></ul><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#393A34;--prism-background-color:#f6f8fa"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#393A34"><span class="token plain">consumer.subscribe(&quot;SqlFilterTest&quot;,</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> MessageSelector.bySql(&quot;(TAGS is not null and TAGS in (&#x27;TagA&#x27;, &#x27;TagB&#x27;))&quot; +</span><br></span><span class="token-line" style="color:#393A34"><span class="token plain"> &quot;and (a is not null and a between 0 and 3)&quot;));</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><h2 class="anchor anchorWithStickyNavbar_LWe7" id="message-retry-and-dead-letter-queue">Message Retry and Dead-Letter Queue<a href="#message-retry-and-dead-letter-queue" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h2><h3 class="anchor anchorWithStickyNavbar_LWe7" id="message-retry">Message Retry<a href="#message-retry" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h3><p>If the Consumer fails to consume a message, RocketMQ will re-pitch the message to the Consumer after the retry interval, and if the message is not successfully consumed after the maximum number of retries, the message will be pitched to the dead message queue.</p><blockquote><p>Message retry is only effective for cluster mode; broadcast mode does not provide the message retry feature. In the broadcast mode, after a failed consumption, the failed message will not be retry and continue to consume new messages.</p></blockquote><ul><li>Maximum number of retries: the maximum number of times a message can be repeatedly delivered after a failed consumption.</li></ul><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#393A34;--prism-background-color:#f6f8fa"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#393A34"><span class="token plain">consumer.setMaxReconsumeTimes(10);</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>Retry interval: the interval after the message consumption fails to be cast to the Consumer again for consumption, which only works in sequential consumption.</p><div class="language-java codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#393A34;--prism-background-color:#f6f8fa"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-java codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#393A34"><span class="token plain">consumer.setSuspendCurrentQueueTimeMillis(5000);</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>The retry mechanism of order consumption and concurrent consumption is not the same. After the order consumption fails to consume, it will first retry locally on the client side until the maximum number of retries, so as to avoid the failed messages being skipped and consuming the next message and disrupting the order of order consumption, while the concurrent consumption will re-cast the failed messages back to the server after the failed consumption, and then wait for the server to re-cast them back, during which it will normally consume the messages behind the queue.</p><blockquote><p>When concurrent consumption fails, it is not cast back to the original Topic, but to a special Topic named %RETRY%ConsumerGroupName, and each ConsumerGroup in cluster mode will correspond to a special Topic and will subscribe to that Topic.
The difference between the two parameters is as follows</p></blockquote><table><thead><tr><th>Consumption type</th><th>Retry interval</th><th>Maximum number of retries</th></tr></thead><tbody><tr><td>Order consumption</td><td>The retry interval time is configured with the custom parameter SuspendCurrentQueueTimeMillis</td><td>The maximum number of retries can be configured with the custom parameter MaxReconsumeTimes. There is no maximum limit to the value of this parameter. If the parameter is not set, the default maximum number of retries is Integer.MAX .</td></tr><tr><td>Concurrent consumption</td><td>The retry interval time changes in steps according to the number of retries, the value range: 1 second ~ 2 hours. Custom configuration is not supported</td><td>The maximum number of retries can be configured by the custom parameter MaxReconsumeTimes. The default value is 16 times. There is no maximum limit for this parameter, and it is recommended to use the default value.</td></tr></tbody></table><p>The retry interval for concurrent consumption is as follows, which can be seen to be exactly the same as the time when the third level of delayed messages starts.</p><table><thead><tr><th>Retry number of times</th><th>The time between the last retry</th><th>Retry number of times</th><th>The time between the last retry</th></tr></thead><tbody><tr><td>1</td><td>10s</td><td>9</td><td>7min</td></tr><tr><td>2</td><td>30s</td><td>10</td><td>8min</td></tr><tr><td>3</td><td>1min</td><td>11</td><td>9min</td></tr><tr><td>4</td><td>2min</td><td>12</td><td>10min</td></tr><tr><td>5</td><td>3min</td><td>13</td><td>20min</td></tr><tr><td>6</td><td>4min</td><td>14</td><td>30min</td></tr><tr><td>7</td><td>5min</td><td>15</td><td>1h</td></tr><tr><td>8</td><td>6min</td><td>16</td><td>2h</td></tr></tbody></table><h3 class="anchor anchorWithStickyNavbar_LWe7" id="dead-letter-queue">Dead-Letter Queue<a href="#dead-letter-queue" class="hash-link" aria-label="Direct link to heading" title="Direct link to heading"></a></h3><p>When a message fails to be consumed for the first time, RocketMQ will automatically retry the message. After reaching the maximum number of retries, if the consumption still fails, it means that the consumer cannot consume the message correctly under normal circumstances. At this point, the message is not immediately discarded, but sent to a special queue corresponding to that consumer, which is called a Dead-Letter Message, and the special queue storing the dead message is called a Dead-Letter Queue, which is a separate queue with a unique number of partitions under the Dead-Letter Topic. If a Dead-Letter Message is generated, the corresponding ConsumerGroup&#x27;s Dead-Letter Topic name is %DLQ%ConsumerGroupName, and the messages in the Dead-Letter Queue will not be consumed again. You can use RocketMQ Admin tool or RocketMQ Dashboard to find out the information of the corresponding dead message.</p></div><footer class="theme-doc-footer docusaurus-mt-lg"><div class="theme-doc-footer-edit-meta-row row"><div class="col"><a href="https://github.com/apache/rocketmq-site/tree/new-official-website/docs/03-consumer/02push.md" target="_blank" rel="noreferrer noopener" class="theme-edit-this-page"><svg fill="currentColor" height="20" width="20" viewBox="0 0 40 40" class="iconEdit_Z9Sw" aria-hidden="true"><g><path d="m34.5 11.7l-3 3.1-6.3-6.3 3.1-3q0.5-0.5 1.2-0.5t1.1 0.5l3.9 3.9q0.5 0.4 0.5 1.1t-0.5 1.2z m-29.5 17.1l18.4-18.5 6.3 6.3-18.4 18.4h-6.3v-6.2z"></path></g></svg>Edit this page</a></div><div class="col lastUpdated_vwxv"></div></div></footer></article><nav class="pagination-nav docusaurus-mt-lg" aria-label="Docs pages navigation"><a class="pagination-nav__link pagination-nav__link--prev" href="/docs/4.x/consumer/01concept2"><div class="pagination-nav__sublabel">Previous</div><div class="pagination-nav__label">Core Concept</div></a><a class="pagination-nav__link pagination-nav__link--next" href="/docs/4.x/consumer/03pull"><div class="pagination-nav__sublabel">Next</div><div class="pagination-nav__label">Pull Consumer</div></a></nav></div></div><div class="col col--3"><div class="tableOfContents_bqdL thin-scrollbar theme-doc-toc-desktop"><ul class="table-of-contents table-of-contents__left-border"><li><a href="#cluster-and-broadcast-mode" class="table-of-contents__link toc-highlight">Cluster and Broadcast Mode</a></li><li><a href="#concurrent-consumption-and-order-consumption" class="table-of-contents__link toc-highlight">Concurrent Consumption and Order Consumption</a></li><li><a href="#message-filtering" class="table-of-contents__link toc-highlight">Message Filtering</a><ul><li><a href="#tag-filtering" class="table-of-contents__link toc-highlight">Tag Filtering</a></li><li><a href="#sql92-filtering" class="table-of-contents__link toc-highlight">SQL92 Filtering</a></li></ul></li><li><a href="#message-retry-and-dead-letter-queue" class="table-of-contents__link toc-highlight">Message Retry and Dead-Letter Queue</a><ul><li><a href="#message-retry" class="table-of-contents__link toc-highlight">Message Retry</a></li><li><a href="#dead-letter-queue" class="table-of-contents__link toc-highlight">Dead-Letter Queue</a></li></ul></li></ul></div></div></div></div></main></div></div><footer class="footer footer--dark"><div class="container container-fluid"><div class="row footer__links"><div class="col footer__col"><div class="footer__title">Learn</div><ul class="footer__items clean-list"><li class="footer__item"><a class="footer__link-item" href="/docs/domainModel/01main">Introduction</a></li><li class="footer__item"><a class="footer__link-item" href="/docs/quickStart/01quickstart">Installation</a></li><li class="footer__item"><a class="footer__link-item" href="/version">Migration from 4.x to 5.0</a></li></ul></div><div class="col footer__col"><div class="footer__title">Community</div><ul class="footer__items clean-list"><li class="footer__item"><a href="https://twitter.com/ApacheRocketMQ" target="_blank" rel="noopener noreferrer" class="footer__link-item">Twitter<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li class="footer__item"><a href="https://github.com/apache/rocketmq" target="_blank" rel="noopener noreferrer" class="footer__link-item">Github</a></li><li class="footer__item"><a href="https://github.com/apache/rocketmq" target="_blank" rel="noopener noreferrer" class="footer__link-item">Help</a></li></ul></div><div class="col footer__col"><div class="footer__title">More</div><ul class="footer__items clean-list"><li class="footer__item"><a class="footer__link-item" href="/blog">Blog</a></li><li class="footer__item"><a class="footer__link-item" href="/release-notes">Changelog</a></li><li class="footer__item"><a href="https://github.com/apache/rocketmq" target="_blank" rel="noopener noreferrer" class="footer__link-item">GitHub<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li class="footer__item"><a href="https://twitter.com/docusaurus" target="_blank" rel="noopener noreferrer" class="footer__link-item">Twitter<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li></ul></div><div class="col footer__col"><div class="footer__title">Legal</div><ul class="footer__items clean-list"><li class="footer__item"><a href="https://www.apache.org/licenses/" target="_blank" rel="noopener noreferrer" class="footer__link-item">Licenses<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li class="footer__item"><a href="https://www.apache.org/security/" target="_blank" rel="noopener noreferrer" class="footer__link-item">Security<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li class="footer__item"><a href="https://www.apache.org/foundation/thanks.html" target="_blank" rel="noopener noreferrer" class="footer__link-item">Thanks<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li class="footer__item"><a href="https://www.apache.org/foundation/sponsorship.html" target="_blank" rel="noopener noreferrer" class="footer__link-item">Sponsorship<svg width="13.5" height="13.5" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li></ul></div></div><div class="footer__bottom text--center"><div class="margin-bottom--sm"><a href="https://rocketmq.apache.org/" rel="noopener noreferrer" class="footerLogoLink_BH7S"><img src="/img/Apache_RocketMQ_logo.svg.png" alt="Meta Open Source Logo" class="themedImage_ToTc themedImage--light_HNdA footer__logo"><img src="/img/Apache_RocketMQ_logo.svg.png" alt="Meta Open Source Logo" class="themedImage_ToTc themedImage--dark_i4oU footer__logo"></a></div><div class="footer__copyright">Copyright © 2022 The Apache Software Foundation. Licensed under the Apache License, Version 2.0.</div></div></div></footer></div>
<script src="/assets/js/runtime~main.9fb1bb92.js"></script>
<script src="/assets/js/main.db9ae330.js"></script>
</body>
</html>