<!DOCTYPE html><html lang="en"><head><meta charSet="utf-8"/><meta http-equiv="X-UA-Compatible" content="IE=edge"/><title>Pulsar Go client · Apache Pulsar</title><meta name="viewport" content="width=device-width, initial-scale=1.0"/><meta name="generator" content="Docusaurus"/><meta name="description" content="&gt; Tips: The CGo client has been deprecated since version 2.7.0."/><meta name="docsearch:version" content="2.10.0"/><meta name="docsearch:language" content="en"/><meta property="og:title" content="Pulsar Go client · Apache Pulsar"/><meta property="og:type" content="website"/><meta property="og:url" content="https://pulsar.apache.org/"/><meta property="og:description" content="&gt; Tips: The CGo client has been deprecated since version 2.7.0."/><meta name="twitter:card" content="summary"/><meta name="twitter:image" content="https://pulsar.apache.org/img/pulsar.svg"/><link rel="shortcut icon" href="/img/pulsar.ico"/><link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/atom-one-dark.min.css"/><link rel="alternate" type="application/atom+xml" href="https://pulsar.apache.org/blog/atom.xml" title="Apache Pulsar Blog ATOM Feed"/><link rel="alternate" type="application/rss+xml" href="https://pulsar.apache.org/blog/feed.xml" title="Apache Pulsar Blog RSS Feed"/><link rel="stylesheet" href="/css/code-blocks-buttons.css"/><script type="text/javascript" src="https://buttons.github.io/buttons.js"></script><script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/clipboard.js/2.0.0/clipboard.min.js"></script><script type="text/javascript" src="/js/custom.js"></script><script src="/js/scrollSpy.js"></script><link rel="stylesheet" href="/css/main.css"/><script src="/js/codetabs.js"></script></head><body class="sideNavVisible separateOnPageNav"><div class="fixedHeaderContainer"><div class="headerWrapper wrapper"><header><a href="/en"><img class="logo" src="/img/pulsar.svg" alt="Apache Pulsar"/></a><a href="/en/versions"><h3>2.10.0</h3></a><div class="navigationWrapper navigationSlider"><nav class="slidingNav"><ul class="nav-site nav-site-internal"><li class="siteNavGroupActive"><a href="/docs/en/getting-started-standalone" target="_self">Docs</a></li><li class=""><a href="/en/download" target="_self">Download</a></li><li class="siteNavGroupActive"><a href="/docs/en/client-libraries" target="_self">Clients</a></li><li class=""><a href="#restapis" target="_self">REST APIs</a></li><li class=""><a href="#cli" target="_self">Cli</a></li><li class=""><a href="/blog/" target="_self">Blog</a></li><li class=""><a href="#community" target="_self">Community</a></li><li class=""><a href="#apache" target="_self">Apache</a></li><li class=""><a href="https://pulsar-next.staged.apache.org/" target="_self">New Website (Beta)</a></li><span><li><a id="languages-menu" href="#"><img class="languages-icon" src="/img/language.svg" alt="Languages icon"/>English</a><div id="languages-dropdown" class="hide"><ul id="languages-dropdown-items"><li><a href="/docs/ja/client-libraries-go">日本語</a></li><li><a href="/docs/fr/client-libraries-go">Français</a></li><li><a href="/docs/ko/client-libraries-go">한국어</a></li><li><a href="/docs/zh-CN/client-libraries-go">中文</a></li><li><a href="/docs/zh-TW/client-libraries-go">繁體中文</a></li><li><a href="https://crowdin.com/project/apache-pulsar" target="_blank" rel="noreferrer noopener">Help Translate</a></li></ul></div></li><script>
        const languagesMenuItem = document.getElementById("languages-menu");
        const languagesDropDown = document.getElementById("languages-dropdown");
        languagesMenuItem.addEventListener("click", function(event) {
          event.preventDefault();

          if (languagesDropDown.className == "hide") {
            languagesDropDown.className = "visible";
          } else {
            languagesDropDown.className = "hide";
          }
        });
      </script></span></ul></nav></div></header></div></div><div class="navPusher"><div class="docMainWrapper wrapper"><div class="docsNavContainer" id="docsNav"><nav class="toc"><div class="toggleNav"><section class="navWrapper wrapper"><div class="navBreadcrumb wrapper"><div class="navToggle" id="navToggler"><div class="hamburger-menu"><div class="line1"></div><div class="line2"></div><div class="line3"></div></div></div><h2><i>›</i><span>Client Libraries</span></h2><div class="tocToggler" id="tocToggler"><i class="icon-toc"></i></div></div><div class="navGroups"><div class="navGroup"><h3 class="navGroupCategoryTitle">Get Started</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/getting-started-standalone">Run Pulsar locally</a></li><li class="navListItem"><a class="navItem" href="/docs/en/getting-started-docker">Run Pulsar in Docker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/getting-started-helm">Run Pulsar in Kubernetes</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Concepts and Architecture</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/concepts-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/concepts-messaging">Messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/concepts-architecture-overview">Architecture</a></li><li class="navListItem"><a class="navItem" href="/docs/en/concepts-clients">Clients</a></li><li class="navListItem"><a class="navItem" href="/docs/en/concepts-replication">Geo Replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/concepts-multi-tenancy">Multi Tenancy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/concepts-authentication">Authentication and Authorization</a></li><li class="navListItem"><a class="navItem" href="/docs/en/concepts-topic-compaction">Topic Compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/concepts-proxy-sni-routing">Proxy support with SNI routing</a></li><li class="navListItem"><a class="navItem" href="/docs/en/concepts-multiple-advertised-listeners">Multiple advertised listeners</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar Schema</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/schema-get-started">Get started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/schema-understand">Understand schema</a></li><li class="navListItem"><a class="navItem" href="/docs/en/schema-evolution-compatibility">Schema evolution and compatibility</a></li><li class="navListItem"><a class="navItem" href="/docs/en/schema-manage">Manage schema</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar Functions</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/functions-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/functions-runtime">Setup: Configure Functions runtime</a></li><li class="navListItem"><a class="navItem" href="/docs/en/functions-worker">Setup: Pulsar Functions Worker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/functions-develop">How-to: Develop</a></li><li class="navListItem"><a class="navItem" href="/docs/en/functions-package">How-to: Package</a></li><li class="navListItem"><a class="navItem" href="/docs/en/functions-debug">How-to: Debug</a></li><li class="navListItem"><a class="navItem" href="/docs/en/functions-deploy">How-to: Deploy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/functions-cli">Reference: CLI</a></li><li class="navListItem"><a class="navItem" href="/docs/en/window-functions-context">Window Functions: Context</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar IO</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/io-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/io-quickstart">Get started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/io-use">Use</a></li><li class="navListItem"><a class="navItem" href="/docs/en/io-debug">Debug</a></li><li class="navListItem"><a class="navItem" href="/docs/en/io-connectors">Built-in connector</a></li><li class="navListItem"><a class="navItem" href="/docs/en/io-cdc">CDC connector</a></li><li class="navListItem"><a class="navItem" href="/docs/en/io-develop">Develop</a></li><li class="navListItem"><a class="navItem" href="/docs/en/io-cli">CLI</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar SQL</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/sql-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/sql-getting-started">Query data</a></li><li class="navListItem"><a class="navItem" href="/docs/en/sql-deployment-configurations">Configuration and deployment</a></li><li class="navListItem"><a class="navItem" href="/docs/en/sql-rest-api">REST APIs</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Tiered Storage</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/tiered-storage-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/tiered-storage-aws">AWS S3 offloader</a></li><li class="navListItem"><a class="navItem" href="/docs/en/tiered-storage-gcs">GCS offloader</a></li><li class="navListItem"><a class="navItem" href="/docs/en/tiered-storage-filesystem">Filesystem offloader</a></li><li class="navListItem"><a class="navItem" href="/docs/en/tiered-storage-azure">Azure BlobStore offloader</a></li><li class="navListItem"><a class="navItem" href="/docs/en/tiered-storage-aliyun">Aliyun OSS offloader</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Transactions</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/txn-why">Why transactions?</a></li><li class="navListItem"><a class="navItem" href="/docs/en/txn-what">What are transactions?</a></li><li class="navListItem"><a class="navItem" href="/docs/en/txn-how">How transactions work?</a></li><li class="navListItem"><a class="navItem" href="/docs/en/txn-use">How to use transactions?</a></li><li class="navListItem"><a class="navItem" href="/docs/en/txn-monitor">How to monitor transactions?</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Kubernetes (Helm)</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/helm-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/helm-prepare">Prepare</a></li><li class="navListItem"><a class="navItem" href="/docs/en/helm-install">Install</a></li><li class="navListItem"><a class="navItem" href="/docs/en/helm-deploy">Deployment</a></li><li class="navListItem"><a class="navItem" href="/docs/en/helm-upgrade">Upgrade</a></li><li class="navListItem"><a class="navItem" href="/docs/en/helm-tools">Required Tools</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Deployment</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/deploy-aws">Amazon Web Services</a></li><li class="navListItem"><a class="navItem" href="/docs/en/deploy-kubernetes">Kubernetes</a></li><li class="navListItem"><a class="navItem" href="/docs/en/deploy-bare-metal">Bare metal</a></li><li class="navListItem"><a class="navItem" href="/docs/en/deploy-bare-metal-multi-cluster">Bare metal multi-cluster</a></li><li class="navListItem"><a class="navItem" href="/docs/en/deploy-dcos">DC/OS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/deploy-docker">Docker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/deploy-monitoring">Monitor</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Administration</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/administration-zk-bk">ZooKeeper and BookKeeper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/administration-geo">Geo-replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/administration-pulsar-manager">Pulsar Manager</a></li><li class="navListItem"><a class="navItem" href="/docs/en/administration-stats">Pulsar statistics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/administration-load-balance">Load balance</a></li><li class="navListItem"><a class="navItem" href="/docs/en/administration-proxy">Pulsar proxy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/administration-upgrade">Upgrade</a></li><li class="navListItem"><a class="navItem" href="/docs/en/administration-isolation">Pulsar isolation</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Security</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/security-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/security-policy-and-supported-versions">Security Policy and Supported Versions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/security-tls-transport">Transport Encryption using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/security-tls-authentication">Authentication using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/security-tls-keystore">Using TLS with KeyStore configure</a></li><li class="navListItem"><a class="navItem" href="/docs/en/security-jwt">Authentication using JWT</a></li><li class="navListItem"><a class="navItem" href="/docs/en/security-athenz">Authentication using Athenz</a></li><li class="navListItem"><a class="navItem" href="/docs/en/security-kerberos">Authentication using Kerberos</a></li><li class="navListItem"><a class="navItem" href="/docs/en/security-oauth2">Authentication using OAuth 2.0 access tokens</a></li><li class="navListItem"><a class="navItem" href="/docs/en/security-authorization">Authorization and ACLs</a></li><li class="navListItem"><a class="navItem" href="/docs/en/security-encryption">End-to-End Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/security-extending">Extend Authentication and Authorization</a></li><li class="navListItem"><a class="navItem" href="/docs/en/security-bouncy-castle">Bouncy Castle Providers</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Performance</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/performance-pulsar-perf">Pulsar Perf</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Client Libraries</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/client-libraries">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/client-libraries-java">Java</a></li><li class="navListItem navListItemActive"><a class="navItem" href="/docs/en/client-libraries-go">Go</a></li><li class="navListItem"><a class="navItem" href="/docs/en/client-libraries-python">Python</a></li><li class="navListItem"><a class="navItem" href="/docs/en/client-libraries-cpp">C++</a></li><li class="navListItem"><a class="navItem" href="/docs/en/client-libraries-node">Node.js</a></li><li class="navListItem"><a class="navItem" href="/docs/en/client-libraries-websocket">WebSocket</a></li><li class="navListItem"><a class="navItem" href="/docs/en/client-libraries-dotnet">C#</a></li><li class="navListItem"><a class="navItem" href="/docs/en/client-libraries-rest">REST</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Admin API</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/admin-api-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/admin-api-clusters">Clusters</a></li><li class="navListItem"><a class="navItem" href="/docs/en/admin-api-tenants">Tenants</a></li><li class="navListItem"><a class="navItem" href="/docs/en/admin-api-brokers">Brokers</a></li><li class="navListItem"><a class="navItem" href="/docs/en/admin-api-namespaces">Namespaces</a></li><li class="navListItem"><a class="navItem" href="/docs/en/admin-api-permissions">Permissions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/admin-api-topics">Topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/admin-api-functions">Functions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/admin-api-packages">Packages</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Adaptors</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/adaptors-kafka">Kafka client wrapper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/adaptors-spark">Apache Spark</a></li><li class="navListItem"><a class="navItem" href="/docs/en/adaptors-storm">Apache Storm</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Cookbooks</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/cookbooks-compaction">Topic compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/cookbooks-deduplication">Message deduplication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/cookbooks-non-persistent">Non-persistent messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/cookbooks-retention-expiry">Message retention and expiry</a></li><li class="navListItem"><a class="navItem" href="/docs/en/cookbooks-encryption">Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/cookbooks-message-queue">Message queue</a></li><li class="navListItem"><a class="navItem" href="/docs/en/cookbooks-bookkeepermetadata">BookKeeper Ledger Metadata</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Development</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/develop-tools">Simulation tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/developing-binary-protocol">Binary protocol</a></li><li class="navListItem"><a class="navItem" href="/docs/en/develop-schema">Custom schema storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/develop-load-manager">Modular load manager</a></li><li class="navListItem"><a class="navItem" href="/docs/en/develop-plugin">Plugin</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Reference</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/reference-terminology">Terminology</a></li><li class="navListItem"><a class="navItem" href="/docs/en/reference-cli-tools">Pulsar CLI tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/reference-configuration">Pulsar configuration</a></li><li class="navListItem"><a class="navItem" href="/docs/en/reference-metrics">Pulsar Metrics</a></li></ul></div></div></section></div><script>
            var coll = document.getElementsByClassName('collapsible');
            var checkActiveCategory = true;
            for (var i = 0; i < coll.length; i++) {
              var links = coll[i].nextElementSibling.getElementsByTagName('*');
              if (checkActiveCategory){
                for (var j = 0; j < links.length; j++) {
                  if (links[j].classList.contains('navListItemActive')){
                    coll[i].nextElementSibling.classList.toggle('hide');
                    coll[i].childNodes[1].classList.toggle('rotate');
                    checkActiveCategory = false;
                    break;
                  }
                }
              }

              coll[i].addEventListener('click', function() {
                var arrow = this.childNodes[1];
                arrow.classList.toggle('rotate');
                var content = this.nextElementSibling;
                content.classList.toggle('hide');
              });
            }

            document.addEventListener('DOMContentLoaded', function() {
              createToggler('#navToggler', '#docsNav', 'docsSliderActive');
              createToggler('#tocToggler', 'body', 'tocActive');

              var headings = document.querySelector('.toc-headings');
              headings && headings.addEventListener('click', function(event) {
                var el = event.target;
                while(el !== headings){
                  if (el.tagName === 'A') {
                    document.body.classList.remove('tocActive');
                    break;
                  } else{
                    el = el.parentNode;
                  }
                }
              }, false);

              function createToggler(togglerSelector, targetSelector, className) {
                var toggler = document.querySelector(togglerSelector);
                var target = document.querySelector(targetSelector);

                if (!toggler) {
                  return;
                }

                toggler.onclick = function(event) {
                  event.preventDefault();

                  target.classList.toggle(className);
                };
              }
            });
        </script></nav></div><div class="container mainContainer docsContainer"><div class="wrapper"><div class="post"><header class="postHeader"><a class="edit-page-link button" href="https://github.com/apache/pulsar/edit/master/site2/docs/client-libraries-go.md" target="_blank" rel="noreferrer noopener">Edit</a><h1 id="__docusaurus" class="postHeaderTitle">Pulsar Go client</h1></header><article><div><span><blockquote>
<p>Tips: The CGo client has been deprecated since version 2.7.0.</p>
</blockquote>
<p>You can use Pulsar <a href="https://github.com/apache/pulsar-client-go">Go client</a> to create Pulsar <a href="#producers">producers</a>, <a href="#consumers">consumers</a>, and <a href="#readers">readers</a> in Go (aka Golang).</p>
<blockquote>
<p><strong>API docs available as well</strong>  <br>
For standard API docs, consult the <a href="https://godoc.org/github.com/apache/pulsar-client-go/pulsar">Godoc</a>.</p>
</blockquote>
<h2><a class="anchor" aria-hidden="true" id="installation"></a><a href="#installation" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Installation</h2>
<h3><a class="anchor" aria-hidden="true" id="install-go-package"></a><a href="#install-go-package" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Install go package</h3>
<p>You can get the <code>pulsar</code> library by using <code>go get</code> or use it with <code>go module</code>.</p>
<p>Download the library of Go client to local environment:</p>
<pre><code class="hljs css language-bash">$ go get -u <span class="hljs-string">"github.com/apache/pulsar-client-go/pulsar"</span>
</code></pre>
<p>Once installed locally, you can import it into your project:</p>
<pre><code class="hljs css language-go"><span class="hljs-keyword">import</span> <span class="hljs-string">"github.com/apache/pulsar-client-go/pulsar"</span>
</code></pre>
<p>Use with go module:</p>
<pre><code class="hljs css language-bash">$ mkdir test_dir &amp;&amp; <span class="hljs-built_in">cd</span> test_dir 
</code></pre>
<p>Write a sample script in the <code>test_dir</code> directory (such as <code>test_example.go</code>) and write <code>package main</code> at the beginning of the file.</p>
<pre><code class="hljs css language-bash">$ go mod init test_dir 
$ go mod tidy &amp;&amp; go mod download
$ go build test_example.go
$ ./test_example
</code></pre>
<h2><a class="anchor" aria-hidden="true" id="connection-urls"></a><a href="#connection-urls" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Connection URLs</h2>
<p>To connect to Pulsar using client libraries, you need to specify a <a href="/docs/en/developing-binary-protocol">Pulsar protocol</a> URL.</p>
<p>Pulsar protocol URLs are assigned to specific clusters, use the <code>pulsar</code> scheme and have a default port of 6650. Here's an example for <code>localhost</code>:</p>
<pre><code class="hljs css language-http">pulsar://localhost:6650
</code></pre>
<p>If you have multiple brokers, you can set the URL as below.</p>
<pre><code class="hljs">pulsar://localhos<span class="hljs-variable">t:6550</span>,localhos<span class="hljs-variable">t:6651</span>,localhos<span class="hljs-variable">t:6652</span>
</code></pre>
<p>A URL for a production Pulsar cluster may look something like this:</p>
<pre><code class="hljs css language-http">pulsar://pulsar.us-west.example.com:6650
</code></pre>
<p>If you're using <a href="/docs/en/security-tls-authentication">TLS</a> authentication, the URL will look like something like this:</p>
<pre><code class="hljs css language-http">pulsar+ssl://pulsar.us-west.example.com:6651
</code></pre>
<h2><a class="anchor" aria-hidden="true" id="create-a-client"></a><a href="#create-a-client" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Create a client</h2>
<p>In order to interact with Pulsar, you'll first need a <code>Client</code> object. You can create a client object using the <code>NewClient</code> function, passing in a <code>ClientOptions</code> object (more on configuration <a href="#client-configuration">below</a>). Here's an example:</p>
<pre><code class="hljs css language-go"><span class="hljs-keyword">import</span> (
    <span class="hljs-string">"log"</span>
    <span class="hljs-string">"time"</span>

    <span class="hljs-string">"github.com/apache/pulsar-client-go/pulsar"</span>
)

<span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-title">main</span><span class="hljs-params">()</span></span> {
    client, err := pulsar.NewClient(pulsar.ClientOptions{
        URL:               <span class="hljs-string">"pulsar://localhost:6650"</span>,
        OperationTimeout:  <span class="hljs-number">30</span> * time.Second,
        ConnectionTimeout: <span class="hljs-number">30</span> * time.Second,
    })
    <span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
        log.Fatalf(<span class="hljs-string">"Could not instantiate Pulsar client: %v"</span>, err)
    }

    <span class="hljs-keyword">defer</span> client.Close()
}
</code></pre>
<p>If you have multiple brokers, you can initiate a client object as below.</p>
<pre><code class="hljs">import (
    <span class="hljs-string">"log"</span>
    <span class="hljs-string">"time"</span>
    <span class="hljs-string">"github.com/apache/pulsar-client-go/pulsar"</span>
)

func main() {
    client, <span class="hljs-built_in">err</span> := pulsar.NewClient(pulsar.ClientOptions{
        URL: <span class="hljs-string">"pulsar://localhost:6650,localhost:6651,localhost:6652"</span>,
        OperationTimeout:  <span class="hljs-number">30</span> * <span class="hljs-built_in">time</span>.<span class="hljs-built_in">Second</span>,
        ConnectionTimeout: <span class="hljs-number">30</span> * <span class="hljs-built_in">time</span>.<span class="hljs-built_in">Second</span>,
    })
    <span class="hljs-keyword">if</span> <span class="hljs-built_in">err</span> != nil {
        <span class="hljs-built_in">log</span>.Fatalf(<span class="hljs-string">"Could not instantiate Pulsar client: %v"</span>, <span class="hljs-built_in">err</span>)
    }

    defer client.Close()
}
</code></pre>
<p>The following configurable parameters are available for Pulsar clients:</p>
<table>
<thead>
<tr><th style="text-align:left">Name</th><th style="text-align:left">Description</th><th style="text-align:left">Default</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left">URL</td><td style="text-align:left">Configure the service URL for the Pulsar service.<br><br>If you have multiple brokers, you can set multiple Pulsar cluster addresses for a client. <br><br>This parameter is <strong>required</strong>.</td><td style="text-align:left">None</td></tr>
<tr><td style="text-align:left">ConnectionTimeout</td><td style="text-align:left">Timeout for the establishment of a TCP connection</td><td style="text-align:left">30s</td></tr>
<tr><td style="text-align:left">OperationTimeout</td><td style="text-align:left">Set the operation timeout. Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the operation will be marked as failed</td><td style="text-align:left">30s</td></tr>
<tr><td style="text-align:left">Authentication</td><td style="text-align:left">Configure the authentication provider. Example: <code>Authentication: NewAuthenticationTLS(&quot;my-cert.pem&quot;, &quot;my-key.pem&quot;)</code></td><td style="text-align:left">no authentication</td></tr>
<tr><td style="text-align:left">TLSTrustCertsFilePath</td><td style="text-align:left">Set the path to the trusted TLS certificate file</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left">TLSAllowInsecureConnection</td><td style="text-align:left">Configure whether the Pulsar client accept untrusted TLS certificate from broker</td><td style="text-align:left">false</td></tr>
<tr><td style="text-align:left">TLSValidateHostname</td><td style="text-align:left">Configure whether the Pulsar client verify the validity of the host name from broker</td><td style="text-align:left">false</td></tr>
<tr><td style="text-align:left">ListenerName</td><td style="text-align:left">Configure the net model for VPC users to connect to the Pulsar broker</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left">MaxConnectionsPerBroker</td><td style="text-align:left">Max number of connections to a single broker that is kept in the pool</td><td style="text-align:left">1</td></tr>
<tr><td style="text-align:left">CustomMetricsLabels</td><td style="text-align:left">Add custom labels to all the metrics reported by this client instance</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left">Logger</td><td style="text-align:left">Configure the logger used by the client</td><td style="text-align:left">logrus.StandardLogger</td></tr>
</tbody>
</table>
<h2><a class="anchor" aria-hidden="true" id="producers"></a><a href="#producers" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Producers</h2>
<p>Pulsar producers publish messages to Pulsar topics. You can <a href="#producer-configuration">configure</a> Go producers using a <code>ProducerOptions</code> object. Here's an example:</p>
<pre><code class="hljs css language-go">producer, err := client.CreateProducer(pulsar.ProducerOptions{
    Topic: <span class="hljs-string">"my-topic"</span>,
})

<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
    log.Fatal(err)
}

_, err = producer.Send(context.Background(), &amp;pulsar.ProducerMessage{
    Payload: []<span class="hljs-keyword">byte</span>(<span class="hljs-string">"hello"</span>),
})

<span class="hljs-keyword">defer</span> producer.Close()

<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
    fmt.Println(<span class="hljs-string">"Failed to publish message"</span>, err)
}
fmt.Println(<span class="hljs-string">"Published message"</span>)
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="producer-operations"></a><a href="#producer-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Producer operations</h3>
<p>Pulsar Go producers have the following methods available:</p>
<table>
<thead>
<tr><th style="text-align:left">Method</th><th style="text-align:left">Description</th><th style="text-align:left">Return type</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left"><code>Topic()</code></td><td style="text-align:left">Fetches the producer's <a href="/docs/en/reference-terminology#topic">topic</a></td><td style="text-align:left"><code>string</code></td></tr>
<tr><td style="text-align:left"><code>Name()</code></td><td style="text-align:left">Fetches the producer's name</td><td style="text-align:left"><code>string</code></td></tr>
<tr><td style="text-align:left"><code>Send(context.Context, *ProducerMessage)</code></td><td style="text-align:left">Publishes a <a href="#messages">message</a> to the producer's topic. This call will block until the message is successfully acknowledged by the Pulsar broker, or an error will be thrown if the timeout set using the <code>SendTimeout</code> in the producer's <a href="#producer-configuration">configuration</a> is exceeded.</td><td style="text-align:left">(MessageID, error)</td></tr>
<tr><td style="text-align:left"><code>SendAsync(context.Context, *ProducerMessage, func(MessageID, *ProducerMessage, error))</code></td><td style="text-align:left">Send a message, this call will be blocking until is successfully acknowledged by the Pulsar broker.</td></tr>
<tr><td style="text-align:left"><code>LastSequenceID()</code></td><td style="text-align:left">Get the last sequence id that was published by this producer. his represent either the automatically assigned or custom sequence id (set on the ProducerMessage) that was published and acknowledged by the broker.</td><td style="text-align:left">int64</td></tr>
<tr><td style="text-align:left"><code>Flush()</code></td><td style="text-align:left">Flush all the messages buffered in the client and wait until all messages have been successfully persisted.</td><td style="text-align:left">error</td></tr>
<tr><td style="text-align:left"><code>Close()</code></td><td style="text-align:left">Closes the producer and releases all resources allocated to it. If <code>Close()</code> is called then no more messages will be accepted from the publisher. This method will block until all pending publish requests have been persisted by Pulsar. If an error is thrown, no pending writes will be retried.</td></tr>
</tbody>
</table>
<h3><a class="anchor" aria-hidden="true" id="producer-example"></a><a href="#producer-example" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Producer Example</h3>
<h4><a class="anchor" aria-hidden="true" id="how-to-use-message-router-in-producer"></a><a href="#how-to-use-message-router-in-producer" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>How to use message router in producer</h4>
<pre><code class="hljs css language-go">client, err := NewClient(pulsar.ClientOptions{
    URL: serviceURL,
})

<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
    log.Fatal(err)
}
<span class="hljs-keyword">defer</span> client.Close()

<span class="hljs-comment">// Only subscribe on the specific partition</span>
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
    Topic:            <span class="hljs-string">"my-partitioned-topic-partition-2"</span>,
    SubscriptionName: <span class="hljs-string">"my-sub"</span>,
})

<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
    log.Fatal(err)
}
<span class="hljs-keyword">defer</span> consumer.Close()

producer, err := client.CreateProducer(pulsar.ProducerOptions{
    Topic: <span class="hljs-string">"my-partitioned-topic"</span>,
    MessageRouter: <span class="hljs-function"><span class="hljs-keyword">func</span><span class="hljs-params">(msg *ProducerMessage, tm TopicMetadata)</span> <span class="hljs-title">int</span></span> {
        fmt.Println(<span class="hljs-string">"Routing message "</span>, msg, <span class="hljs-string">" -- Partitions: "</span>, tm.NumPartitions())
        <span class="hljs-keyword">return</span> <span class="hljs-number">2</span>
    },
})

<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
    log.Fatal(err)
}
<span class="hljs-keyword">defer</span> producer.Close()
</code></pre>
<h4><a class="anchor" aria-hidden="true" id="how-to-use-schema-interface-in-producer"></a><a href="#how-to-use-schema-interface-in-producer" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>How to use schema interface in producer</h4>
<pre><code class="hljs css language-go"><span class="hljs-keyword">type</span> testJSON <span class="hljs-keyword">struct</span> {
    ID   <span class="hljs-keyword">int</span>    <span class="hljs-string">`json:"id"`</span>
    Name <span class="hljs-keyword">string</span> <span class="hljs-string">`json:"name"`</span>
}
</code></pre>
<pre><code class="hljs css language-go"><span class="hljs-keyword">var</span> (
    exampleSchemaDef = <span class="hljs-string">"{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","</span> +
        <span class="hljs-string">"\"fields\":[{\"name\":\"ID\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}"</span>
)
</code></pre>
<pre><code class="hljs css language-go">client, err := NewClient(pulsar.ClientOptions{
    URL: <span class="hljs-string">"pulsar://localhost:6650"</span>,
})
<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
    log.Fatal(err)
}
<span class="hljs-keyword">defer</span> client.Close()

properties := <span class="hljs-built_in">make</span>(<span class="hljs-keyword">map</span>[<span class="hljs-keyword">string</span>]<span class="hljs-keyword">string</span>)
properties[<span class="hljs-string">"pulsar"</span>] = <span class="hljs-string">"hello"</span>
jsonSchemaWithProperties := NewJSONSchema(exampleSchemaDef, properties)
producer, err := client.CreateProducer(ProducerOptions{
    Topic:  <span class="hljs-string">"jsonTopic"</span>,
    Schema: jsonSchemaWithProperties,
})
assert.Nil(t, err)

_, err = producer.Send(context.Background(), &amp;ProducerMessage{
    Value: &amp;testJSON{
        ID:   <span class="hljs-number">100</span>,
        Name: <span class="hljs-string">"pulsar"</span>,
    },
})
<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
    log.Fatal(err)
}
producer.Close()
</code></pre>
<h4><a class="anchor" aria-hidden="true" id="how-to-use-delay-relative-in-producer"></a><a href="#how-to-use-delay-relative-in-producer" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>How to use delay relative in producer</h4>
<pre><code class="hljs css language-go">client, err := NewClient(pulsar.ClientOptions{
    URL: <span class="hljs-string">"pulsar://localhost:6650"</span>,
})
<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
    log.Fatal(err)
}
<span class="hljs-keyword">defer</span> client.Close()

topicName := newTopicName()
producer, err := client.CreateProducer(pulsar.ProducerOptions{
    Topic:           topicName,
    DisableBatching: <span class="hljs-literal">true</span>,
})
<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
    log.Fatal(err)
}
<span class="hljs-keyword">defer</span> producer.Close()

consumer, err := client.Subscribe(pulsar.ConsumerOptions{
    Topic:            topicName,
    SubscriptionName: <span class="hljs-string">"subName"</span>,
    Type:             Shared,
})
<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
    log.Fatal(err)
}
<span class="hljs-keyword">defer</span> consumer.Close()

ID, err := producer.Send(context.Background(), &amp;pulsar.ProducerMessage{
    Payload:      []<span class="hljs-keyword">byte</span>(fmt.Sprintf(<span class="hljs-string">"test"</span>)),
    DeliverAfter: <span class="hljs-number">3</span> * time.Second,
})
<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
    log.Fatal(err)
}
fmt.Println(ID)

ctx, canc := context.WithTimeout(context.Background(), <span class="hljs-number">1</span>*time.Second)
msg, err := consumer.Receive(ctx)
<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
    log.Fatal(err)
}
fmt.Println(msg.Payload())
canc()

ctx, canc = context.WithTimeout(context.Background(), <span class="hljs-number">5</span>*time.Second)
msg, err = consumer.Receive(ctx)
<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
    log.Fatal(err)
}
fmt.Println(msg.Payload())
canc()
</code></pre>
<h4><a class="anchor" aria-hidden="true" id="how-to-use-prometheus-metrics-in-producer"></a><a href="#how-to-use-prometheus-metrics-in-producer" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>How to use Prometheus metrics in producer</h4>
<p>Pulsar Go client registers client metrics using Prometheus. This section demonstrates how to create a simple Pulsar producer application that exposes Prometheus metrics via HTTP.</p>
<ol>
<li>Write a simple producer application.</li>
</ol>
<pre><code class="hljs css language-go"><span class="hljs-comment">// Create a Pulsar client</span>
client, err := pulsar.NewClient(pulsar.ClientOptions{
    URL: <span class="hljs-string">"pulsar://localhost:6650"</span>,
})
<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
    log.Fatal(err)
}

<span class="hljs-keyword">defer</span> client.Close()

<span class="hljs-comment">// Start a separate goroutine for Prometheus metrics</span>
<span class="hljs-comment">// In this case, Prometheus metrics can be accessed via http://localhost:2112/metrics</span>
<span class="hljs-keyword">go</span> <span class="hljs-function"><span class="hljs-keyword">func</span><span class="hljs-params">()</span></span> {
    prometheusPort := <span class="hljs-number">2112</span>
    log.Printf(<span class="hljs-string">"Starting Prometheus metrics at http://localhost:%v/metrics\n"</span>, prometheusPort)
    http.Handle(<span class="hljs-string">"/metrics"</span>, promhttp.Handler())
    err = http.ListenAndServe(<span class="hljs-string">":"</span>+strconv.Itoa(prometheusPort), <span class="hljs-literal">nil</span>)
    <span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
        log.Fatal(err)
    }
}()

<span class="hljs-comment">// Create a producer</span>
producer, err := client.CreateProducer(pulsar.ProducerOptions{
    Topic: <span class="hljs-string">"topic-1"</span>,
})
<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
    log.Fatal(err)
}

<span class="hljs-keyword">defer</span> producer.Close()

ctx := context.Background()

<span class="hljs-comment">// Write your business logic here</span>
<span class="hljs-comment">// In this case, you build a simple Web server. You can produce messages by requesting http://localhost:8082/produce</span>
webPort := <span class="hljs-number">8082</span>
http.HandleFunc(<span class="hljs-string">"/produce"</span>, <span class="hljs-function"><span class="hljs-keyword">func</span><span class="hljs-params">(w http.ResponseWriter, r *http.Request)</span></span> {
    msgId, err := producer.Send(ctx, &amp;pulsar.ProducerMessage{
        Payload: []<span class="hljs-keyword">byte</span>(fmt.Sprintf(<span class="hljs-string">"hello world"</span>)),
    })
    <span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
        log.Fatal(err)
    } <span class="hljs-keyword">else</span> {
        log.Printf(<span class="hljs-string">"Published message: %v"</span>, msgId)
        fmt.Fprintf(w, <span class="hljs-string">"Published message: %v"</span>, msgId)
    }
})

err = http.ListenAndServe(<span class="hljs-string">":"</span>+strconv.Itoa(webPort), <span class="hljs-literal">nil</span>)
<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
    log.Fatal(err)
}
</code></pre>
<ol start="2">
<li>To scrape metrics from applications, configure a local running Prometheus instance using a configuration file (<code>prometheus.yml</code>).</li>
</ol>
<pre><code class="hljs css language-yaml"><span class="hljs-attr">scrape_configs:</span>
<span class="hljs-bullet">-</span> <span class="hljs-attr">job_name:</span> <span class="hljs-string">pulsar-client-go-metrics</span>
  <span class="hljs-attr">scrape_interval:</span> <span class="hljs-string">10s</span>
  <span class="hljs-attr">static_configs:</span>
  <span class="hljs-bullet">-</span> <span class="hljs-attr">targets:</span>
    <span class="hljs-bullet">-</span> <span class="hljs-string">localhost:2112</span>
</code></pre>
<p>Now you can query Pulsar client metrics on Prometheus.</p>
<h3><a class="anchor" aria-hidden="true" id="producer-configuration"></a><a href="#producer-configuration" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Producer configuration</h3>
<table>
<thead>
<tr><th style="text-align:left">Name</th><th style="text-align:left">Description</th><th style="text-align:left">Default</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left">Topic</td><td style="text-align:left">Topic specify the topic this consumer will subscribe to. This argument is required when constructing the reader.</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left">Name</td><td style="text-align:left">Name specify a name for the producer. If not assigned, the system will generate a globally unique name which can be access with Producer.ProducerName().</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left">Properties</td><td style="text-align:left">Properties attach a set of application defined properties to the producer This properties will be visible in the topic stats</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left">SendTimeout</td><td style="text-align:left">SendTimeout set the timeout for a message that is not acknowledged by the server</td><td style="text-align:left">30s</td></tr>
<tr><td style="text-align:left">DisableBlockIfQueueFull</td><td style="text-align:left">DisableBlockIfQueueFull control whether Send and SendAsync block if producer's message queue is full</td><td style="text-align:left">false</td></tr>
<tr><td style="text-align:left">MaxPendingMessages</td><td style="text-align:left">MaxPendingMessages set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left">HashingScheme</td><td style="text-align:left">HashingScheme change the <code>HashingScheme</code> used to chose the partition on where to publish a particular message.</td><td style="text-align:left">JavaStringHash</td></tr>
<tr><td style="text-align:left">CompressionType</td><td style="text-align:left">CompressionType set the compression type for the producer.</td><td style="text-align:left">not compressed</td></tr>
<tr><td style="text-align:left">CompressionLevel</td><td style="text-align:left">Define the desired compression level. Options: Default, Faster and Better</td><td style="text-align:left">Default</td></tr>
<tr><td style="text-align:left">MessageRouter</td><td style="text-align:left">MessageRouter set a custom message routing policy by passing an implementation of MessageRouter</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left">DisableBatching</td><td style="text-align:left">DisableBatching control whether automatic batching of messages is enabled for the producer.</td><td style="text-align:left">false</td></tr>
<tr><td style="text-align:left">BatchingMaxPublishDelay</td><td style="text-align:left">BatchingMaxPublishDelay set the time period within which the messages sent will be batched</td><td style="text-align:left">1ms</td></tr>
<tr><td style="text-align:left">BatchingMaxMessages</td><td style="text-align:left">BatchingMaxMessages set the maximum number of messages permitted in a batch.</td><td style="text-align:left">1000</td></tr>
<tr><td style="text-align:left">BatchingMaxSize</td><td style="text-align:left">BatchingMaxSize sets the maximum number of bytes permitted in a batch.</td><td style="text-align:left">128KB</td></tr>
<tr><td style="text-align:left">Schema</td><td style="text-align:left">Schema set a custom schema type by passing an implementation of <code>Schema</code></td><td style="text-align:left">bytes[]</td></tr>
<tr><td style="text-align:left">Interceptors</td><td style="text-align:left">A chain of interceptors. These interceptors are called at some points defined in the <code>ProducerInterceptor</code> interface.</td><td style="text-align:left">None</td></tr>
<tr><td style="text-align:left">MaxReconnectToBroker</td><td style="text-align:left">MaxReconnectToBroker set the maximum retry number of reconnectToBroker</td><td style="text-align:left">ultimate</td></tr>
<tr><td style="text-align:left">BatcherBuilderType</td><td style="text-align:left">BatcherBuilderType sets the batch builder type. This is used to create a batch container when batching is enabled. Options: DefaultBatchBuilder and KeyBasedBatchBuilder</td><td style="text-align:left">DefaultBatchBuilder</td></tr>
</tbody>
</table>
<h2><a class="anchor" aria-hidden="true" id="consumers"></a><a href="#consumers" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Consumers</h2>
<p>Pulsar consumers subscribe to one or more Pulsar topics and listen for incoming messages produced on that topic/those topics. You can <a href="#consumer-configuration">configure</a> Go consumers using a <code>ConsumerOptions</code> object. Here's a basic example that uses channels:</p>
<pre><code class="hljs css language-go">consumer, err := client.Subscribe(pulsar.ConsumerOptions{
    Topic:            <span class="hljs-string">"topic-1"</span>,
    SubscriptionName: <span class="hljs-string">"my-sub"</span>,
    Type:             pulsar.Shared,
})
<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
    log.Fatal(err)
}
<span class="hljs-keyword">defer</span> consumer.Close()

<span class="hljs-keyword">for</span> i := <span class="hljs-number">0</span>; i &lt; <span class="hljs-number">10</span>; i++ {
    msg, err := consumer.Receive(context.Background())
    <span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
        log.Fatal(err)
    }

    fmt.Printf(<span class="hljs-string">"Received message msgId: %#v -- content: '%s'\n"</span>,
        msg.ID(), <span class="hljs-keyword">string</span>(msg.Payload()))

    consumer.Ack(msg)
}

<span class="hljs-keyword">if</span> err := consumer.Unsubscribe(); err != <span class="hljs-literal">nil</span> {
    log.Fatal(err)
}
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="consumer-operations"></a><a href="#consumer-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Consumer operations</h3>
<p>Pulsar Go consumers have the following methods available:</p>
<table>
<thead>
<tr><th style="text-align:left">Method</th><th style="text-align:left">Description</th><th style="text-align:left">Return type</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left"><code>Subscription()</code></td><td style="text-align:left">Returns the consumer's subscription name</td><td style="text-align:left"><code>string</code></td></tr>
<tr><td style="text-align:left"><code>Unsubcribe()</code></td><td style="text-align:left">Unsubscribes the consumer from the assigned topic. Throws an error if the unsubscribe operation is somehow unsuccessful.</td><td style="text-align:left"><code>error</code></td></tr>
<tr><td style="text-align:left"><code>Receive(context.Context)</code></td><td style="text-align:left">Receives a single message from the topic. This method blocks until a message is available.</td><td style="text-align:left"><code>(Message, error)</code></td></tr>
<tr><td style="text-align:left"><code>Chan()</code></td><td style="text-align:left">Chan returns a channel from which to consume messages.</td><td style="text-align:left"><code>&lt;-chan ConsumerMessage</code></td></tr>
<tr><td style="text-align:left"><code>Ack(Message)</code></td><td style="text-align:left"><a href="/docs/en/reference-terminology#acknowledgment-ack">Acknowledges</a> a message to the Pulsar <a href="/docs/en/reference-terminology#broker">broker</a></td></tr>
<tr><td style="text-align:left"><code>AckID(MessageID)</code></td><td style="text-align:left"><a href="/docs/en/reference-terminology#acknowledgment-ack">Acknowledges</a> a message to the Pulsar <a href="/docs/en/reference-terminology#broker">broker</a> by message ID</td></tr>
<tr><td style="text-align:left"><code>ReconsumeLater(msg Message, delay time.Duration)</code></td><td style="text-align:left">ReconsumeLater mark a message for redelivery after custom delay</td></tr>
<tr><td style="text-align:left"><code>Nack(Message)</code></td><td style="text-align:left">Acknowledge the failure to process a single message.</td></tr>
<tr><td style="text-align:left"><code>NackID(MessageID)</code></td><td style="text-align:left">Acknowledge the failure to process a single message.</td></tr>
<tr><td style="text-align:left"><code>Seek(msgID MessageID)</code></td><td style="text-align:left">Reset the subscription associated with this consumer to a specific message id. The message id can either be a specific message or represent the first or last messages in the topic.</td><td style="text-align:left"><code>error</code></td></tr>
<tr><td style="text-align:left"><code>SeekByTime(time time.Time)</code></td><td style="text-align:left">Reset the subscription associated with this consumer to a specific message publish time.</td><td style="text-align:left"><code>error</code></td></tr>
<tr><td style="text-align:left"><code>Close()</code></td><td style="text-align:left">Closes the consumer, disabling its ability to receive messages from the broker</td></tr>
<tr><td style="text-align:left"><code>Name()</code></td><td style="text-align:left">Name returns the name of consumer</td><td style="text-align:left"><code>string</code></td></tr>
</tbody>
</table>
<h3><a class="anchor" aria-hidden="true" id="receive-example"></a><a href="#receive-example" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Receive example</h3>
<h4><a class="anchor" aria-hidden="true" id="how-to-use-regex-consumer"></a><a href="#how-to-use-regex-consumer" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>How to use regex consumer</h4>
<pre><code class="hljs css language-go">client, err := pulsar.NewClient(pulsar.ClientOptions{
    URL: <span class="hljs-string">"pulsar://localhost:6650"</span>,
})

<span class="hljs-keyword">defer</span> client.Close()

p, err := client.CreateProducer(pulsar.ProducerOptions{
    Topic:           topicInRegex,
    DisableBatching: <span class="hljs-literal">true</span>,
})
<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
    log.Fatal(err)
}
<span class="hljs-keyword">defer</span> p.Close()

topicsPattern := fmt.Sprintf(<span class="hljs-string">"persistent://%s/foo.*"</span>, namespace)
opts := pulsar.ConsumerOptions{
    TopicsPattern:    topicsPattern,
    SubscriptionName: <span class="hljs-string">"regex-sub"</span>,
}
consumer, err := client.Subscribe(opts)
<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
    log.Fatal(err)
}
<span class="hljs-keyword">defer</span> consumer.Close()
</code></pre>
<h4><a class="anchor" aria-hidden="true" id="how-to-use-multi-topics-consumer"></a><a href="#how-to-use-multi-topics-consumer" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>How to use multi topics Consumer</h4>
<pre><code class="hljs css language-go"><span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-title">newTopicName</span><span class="hljs-params">()</span> <span class="hljs-title">string</span></span> {
    <span class="hljs-keyword">return</span> fmt.Sprintf(<span class="hljs-string">"my-topic-%v"</span>, time.Now().Nanosecond())
}


topic1 := <span class="hljs-string">"topic-1"</span>
topic2 := <span class="hljs-string">"topic-2"</span>

client, err := NewClient(pulsar.ClientOptions{
    URL: <span class="hljs-string">"pulsar://localhost:6650"</span>,
})
<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
    log.Fatal(err)
}
topics := []<span class="hljs-keyword">string</span>{topic1, topic2}
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
    Topics:           topics,
    SubscriptionName: <span class="hljs-string">"multi-topic-sub"</span>,
})
<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
    log.Fatal(err)
}
<span class="hljs-keyword">defer</span> consumer.Close()
</code></pre>
<h4><a class="anchor" aria-hidden="true" id="how-to-use-consumer-listener"></a><a href="#how-to-use-consumer-listener" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>How to use consumer listener</h4>
<pre><code class="hljs css language-go"><span class="hljs-keyword">import</span> (
    <span class="hljs-string">"fmt"</span>
    <span class="hljs-string">"log"</span>

    <span class="hljs-string">"github.com/apache/pulsar-client-go/pulsar"</span>
)

<span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-title">main</span><span class="hljs-params">()</span></span> {
    client, err := pulsar.NewClient(pulsar.ClientOptions{URL: <span class="hljs-string">"pulsar://localhost:6650"</span>})
    <span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
        log.Fatal(err)
    }

    <span class="hljs-keyword">defer</span> client.Close()

    channel := <span class="hljs-built_in">make</span>(<span class="hljs-keyword">chan</span> pulsar.ConsumerMessage, <span class="hljs-number">100</span>)

    options := pulsar.ConsumerOptions{
        Topic:            <span class="hljs-string">"topic-1"</span>,
        SubscriptionName: <span class="hljs-string">"my-subscription"</span>,
        Type:             pulsar.Shared,
    }

    options.MessageChannel = channel

    consumer, err := client.Subscribe(options)
    <span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
        log.Fatal(err)
    }

    <span class="hljs-keyword">defer</span> consumer.Close()

    <span class="hljs-comment">// Receive messages from channel. The channel returns a struct which contains message and the consumer from where</span>
    <span class="hljs-comment">// the message was received. It's not necessary here since we have 1 single consumer, but the channel could be</span>
    <span class="hljs-comment">// shared across multiple consumers as well</span>
    <span class="hljs-keyword">for</span> cm := <span class="hljs-keyword">range</span> channel {
        msg := cm.Message
        fmt.Printf(<span class="hljs-string">"Received message  msgId: %v -- content: '%s'\n"</span>,
            msg.ID(), <span class="hljs-keyword">string</span>(msg.Payload()))

        consumer.Ack(msg)
    }
}
</code></pre>
<h4><a class="anchor" aria-hidden="true" id="how-to-use-consumer-receive-timeout"></a><a href="#how-to-use-consumer-receive-timeout" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>How to use consumer receive timeout</h4>
<pre><code class="hljs css language-go">client, err := NewClient(pulsar.ClientOptions{
    URL: <span class="hljs-string">"pulsar://localhost:6650"</span>,
})
<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
    log.Fatal(err)
}
<span class="hljs-keyword">defer</span> client.Close()

topic := <span class="hljs-string">"test-topic-with-no-messages"</span>
ctx, cancel := context.WithTimeout(context.Background(), <span class="hljs-number">500</span>*time.Millisecond)
<span class="hljs-keyword">defer</span> cancel()

<span class="hljs-comment">// create consumer</span>
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
    Topic:            topic,
    SubscriptionName: <span class="hljs-string">"my-sub1"</span>,
    Type:             Shared,
})
<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
    log.Fatal(err)
}
<span class="hljs-keyword">defer</span> consumer.Close()

msg, err := consumer.Receive(ctx)
fmt.Println(msg.Payload())
<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
    log.Fatal(err)
}
</code></pre>
<h4><a class="anchor" aria-hidden="true" id="how-to-use-schema-in-consumer"></a><a href="#how-to-use-schema-in-consumer" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>How to use schema in consumer</h4>
<pre><code class="hljs css language-go"><span class="hljs-keyword">type</span> testJSON <span class="hljs-keyword">struct</span> {
    ID   <span class="hljs-keyword">int</span>    <span class="hljs-string">`json:"id"`</span>
    Name <span class="hljs-keyword">string</span> <span class="hljs-string">`json:"name"`</span>
}
</code></pre>
<pre><code class="hljs css language-go"><span class="hljs-keyword">var</span> (
    exampleSchemaDef = <span class="hljs-string">"{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","</span> +
        <span class="hljs-string">"\"fields\":[{\"name\":\"ID\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}"</span>
)
</code></pre>
<pre><code class="hljs css language-go">client, err := NewClient(pulsar.ClientOptions{
    URL: <span class="hljs-string">"pulsar://localhost:6650"</span>,
})
<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
    log.Fatal(err)
}
<span class="hljs-keyword">defer</span> client.Close()

<span class="hljs-keyword">var</span> s testJSON

consumerJS := NewJSONSchema(exampleSchemaDef, <span class="hljs-literal">nil</span>)
consumer, err := client.Subscribe(ConsumerOptions{
    Topic:                       <span class="hljs-string">"jsonTopic"</span>,
    SubscriptionName:            <span class="hljs-string">"sub-1"</span>,
    Schema:                      consumerJS,
    SubscriptionInitialPosition: SubscriptionPositionEarliest,
})
assert.Nil(t, err)
msg, err := consumer.Receive(context.Background())
assert.Nil(t, err)
err = msg.GetSchemaValue(&amp;s)
<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
    log.Fatal(err)
}

<span class="hljs-keyword">defer</span> consumer.Close()
</code></pre>
<h4><a class="anchor" aria-hidden="true" id="how-to-use-prometheus-metrics-in-consumer"></a><a href="#how-to-use-prometheus-metrics-in-consumer" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>How to use Prometheus metrics in consumer</h4>
<p>In this guide, This section demonstrates how to create a simple Pulsar consumer application that exposes Prometheus metrics via HTTP.</p>
<ol>
<li>Write a simple consumer application.</li>
</ol>
<pre><code class="hljs css language-go"><span class="hljs-comment">// Create a Pulsar client</span>
client, err := pulsar.NewClient(pulsar.ClientOptions{
    URL: <span class="hljs-string">"pulsar://localhost:6650"</span>,
})
<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
    log.Fatal(err)
}

<span class="hljs-keyword">defer</span> client.Close()

<span class="hljs-comment">// Start a separate goroutine for Prometheus metrics</span>
<span class="hljs-comment">// In this case, Prometheus metrics can be accessed via http://localhost:2112/metrics</span>
<span class="hljs-keyword">go</span> <span class="hljs-function"><span class="hljs-keyword">func</span><span class="hljs-params">()</span></span> {
    prometheusPort := <span class="hljs-number">2112</span>
    log.Printf(<span class="hljs-string">"Starting Prometheus metrics at http://localhost:%v/metrics\n"</span>, prometheusPort)
    http.Handle(<span class="hljs-string">"/metrics"</span>, promhttp.Handler())
    err = http.ListenAndServe(<span class="hljs-string">":"</span>+strconv.Itoa(prometheusPort), <span class="hljs-literal">nil</span>)
    <span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
        log.Fatal(err)
    }
}()

<span class="hljs-comment">// Create a consumer</span>
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
    Topic:            <span class="hljs-string">"topic-1"</span>,
    SubscriptionName: <span class="hljs-string">"sub-1"</span>,
    Type:             pulsar.Shared,
})
<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
    log.Fatal(err)
}

<span class="hljs-keyword">defer</span> consumer.Close()

ctx := context.Background()

<span class="hljs-comment">// Write your business logic here</span>
<span class="hljs-comment">// In this case, you build a simple Web server. You can consume messages by requesting http://localhost:8083/consume</span>
webPort := <span class="hljs-number">8083</span>
http.HandleFunc(<span class="hljs-string">"/consume"</span>, <span class="hljs-function"><span class="hljs-keyword">func</span><span class="hljs-params">(w http.ResponseWriter, r *http.Request)</span></span> {
    msg, err := consumer.Receive(ctx)
    <span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
        log.Fatal(err)
    } <span class="hljs-keyword">else</span> {
        log.Printf(<span class="hljs-string">"Received message msgId: %v -- content: '%s'\n"</span>, msg.ID(), <span class="hljs-keyword">string</span>(msg.Payload()))
        fmt.Fprintf(w, <span class="hljs-string">"Received message msgId: %v -- content: '%s'\n"</span>, msg.ID(), <span class="hljs-keyword">string</span>(msg.Payload()))
        consumer.Ack(msg)
    }
})

err = http.ListenAndServe(<span class="hljs-string">":"</span>+strconv.Itoa(webPort), <span class="hljs-literal">nil</span>)
<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
    log.Fatal(err)
}
</code></pre>
<ol start="2">
<li>To scrape metrics from applications, configure a local running Prometheus instance using a configuration file (<code>prometheus.yml</code>).</li>
</ol>
<pre><code class="hljs css language-yaml"><span class="hljs-attr">scrape_configs:</span>
<span class="hljs-bullet">-</span> <span class="hljs-attr">job_name:</span> <span class="hljs-string">pulsar-client-go-metrics</span>
  <span class="hljs-attr">scrape_interval:</span> <span class="hljs-string">10s</span>
  <span class="hljs-attr">static_configs:</span>
  <span class="hljs-bullet">-</span> <span class="hljs-attr">targets:</span>
    <span class="hljs-bullet">-</span> <span class="hljs-string">localhost:2112</span>
</code></pre>
<p>Now you can query Pulsar client metrics on Prometheus.</p>
<h3><a class="anchor" aria-hidden="true" id="consumer-configuration"></a><a href="#consumer-configuration" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Consumer configuration</h3>
<table>
<thead>
<tr><th style="text-align:left">Name</th><th style="text-align:left">Description</th><th style="text-align:left">Default</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left">Topic</td><td style="text-align:left">Topic specify the topic this consumer will subscribe to. This argument is required when constructing the reader.</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left">Topics</td><td style="text-align:left">Specify a list of topics this consumer will subscribe on. Either a topic, a list of topics or a topics pattern are required when subscribing</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left">TopicsPattern</td><td style="text-align:left">Specify a regular expression to subscribe to multiple topics under the same namespace. Either a topic, a list of topics or a topics pattern are required when subscribing</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left">AutoDiscoveryPeriod</td><td style="text-align:left">Specify the interval in which to poll for new partitions or new topics if using a TopicsPattern.</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left">SubscriptionName</td><td style="text-align:left">Specify the subscription name for this consumer. This argument is required when subscribing</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left">Name</td><td style="text-align:left">Set the consumer name</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left">Properties</td><td style="text-align:left">Properties attach a set of application defined properties to the producer This properties will be visible in the topic stats</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left">Type</td><td style="text-align:left">Select the subscription type to be used when subscribing to the topic.</td><td style="text-align:left">Exclusive</td></tr>
<tr><td style="text-align:left">SubscriptionInitialPosition</td><td style="text-align:left">InitialPosition at which the cursor will be set when subscribe</td><td style="text-align:left">Latest</td></tr>
<tr><td style="text-align:left">DLQ</td><td style="text-align:left">Configuration for Dead Letter Queue consumer policy.</td><td style="text-align:left">no DLQ</td></tr>
<tr><td style="text-align:left">MessageChannel</td><td style="text-align:left">Sets a <code>MessageChannel</code> for the consumer. When a message is received, it will be pushed to the channel for consumption</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left">ReceiverQueueSize</td><td style="text-align:left">Sets the size of the consumer receive queue.</td><td style="text-align:left">1000</td></tr>
<tr><td style="text-align:left">NackRedeliveryDelay</td><td style="text-align:left">The delay after which to redeliver the messages that failed to be processed</td><td style="text-align:left">1min</td></tr>
<tr><td style="text-align:left">ReadCompacted</td><td style="text-align:left">If enabled, the consumer will read messages from the compacted topic rather than reading the full message backlog of the topic</td><td style="text-align:left">false</td></tr>
<tr><td style="text-align:left">ReplicateSubscriptionState</td><td style="text-align:left">Mark the subscription as replicated to keep it in sync across clusters</td><td style="text-align:left">false</td></tr>
<tr><td style="text-align:left">KeySharedPolicy</td><td style="text-align:left">Configuration for Key Shared consumer policy.</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left">RetryEnable</td><td style="text-align:left">Auto retry send messages to default filled DLQPolicy topics</td><td style="text-align:left">false</td></tr>
<tr><td style="text-align:left">Interceptors</td><td style="text-align:left">A chain of interceptors. These interceptors are called at some points defined in the <code>ConsumerInterceptor</code> interface.</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left">MaxReconnectToBroker</td><td style="text-align:left">MaxReconnectToBroker set the maximum retry number of reconnectToBroker.</td><td style="text-align:left">ultimate</td></tr>
<tr><td style="text-align:left">Schema</td><td style="text-align:left">Schema set a custom schema type by passing an implementation of <code>Schema</code></td><td style="text-align:left">bytes[]</td></tr>
</tbody>
</table>
<h2><a class="anchor" aria-hidden="true" id="readers"></a><a href="#readers" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Readers</h2>
<p>Pulsar readers process messages from Pulsar topics. Readers are different from consumers because with readers you need to explicitly specify which message in the stream you want to begin with (consumers, on the other hand, automatically begin with the most recent unacked message). You can <a href="#reader-configuration">configure</a> Go readers using a <code>ReaderOptions</code> object. Here's an example:</p>
<pre><code class="hljs css language-go">reader, err := client.CreateReader(pulsar.ReaderOptions{
    Topic:          <span class="hljs-string">"topic-1"</span>,
    StartMessageID: pulsar.EarliestMessageID(),
})
<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
    log.Fatal(err)
}
<span class="hljs-keyword">defer</span> reader.Close()
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="reader-operations"></a><a href="#reader-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Reader operations</h3>
<p>Pulsar Go readers have the following methods available:</p>
<table>
<thead>
<tr><th style="text-align:left">Method</th><th style="text-align:left">Description</th><th style="text-align:left">Return type</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left"><code>Topic()</code></td><td style="text-align:left">Returns the reader's <a href="/docs/en/reference-terminology#topic">topic</a></td><td style="text-align:left"><code>string</code></td></tr>
<tr><td style="text-align:left"><code>Next(context.Context)</code></td><td style="text-align:left">Receives the next message on the topic (analogous to the <code>Receive</code> method for <a href="#consumer-operations">consumers</a>). This method blocks until a message is available.</td><td style="text-align:left"><code>(Message, error)</code></td></tr>
<tr><td style="text-align:left"><code>HasNext()</code></td><td style="text-align:left">Check if there is any message available to read from the current position</td><td style="text-align:left">(bool, error)</td></tr>
<tr><td style="text-align:left"><code>Close()</code></td><td style="text-align:left">Closes the reader, disabling its ability to receive messages from the broker</td><td style="text-align:left"><code>error</code></td></tr>
<tr><td style="text-align:left"><code>Seek(MessageID)</code></td><td style="text-align:left">Reset the subscription associated with this reader to a specific message ID</td><td style="text-align:left"><code>error</code></td></tr>
<tr><td style="text-align:left"><code>SeekByTime(time time.Time)</code></td><td style="text-align:left">Reset the subscription associated with this reader to a specific message publish time</td><td style="text-align:left"><code>error</code></td></tr>
</tbody>
</table>
<h3><a class="anchor" aria-hidden="true" id="reader-example"></a><a href="#reader-example" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Reader example</h3>
<h4><a class="anchor" aria-hidden="true" id="how-to-use-reader-to-read-next-message"></a><a href="#how-to-use-reader-to-read-next-message" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>How to use reader to read 'next' message</h4>
<p>Here's an example usage of a Go reader that uses the <code>Next()</code> method to process incoming messages:</p>
<pre><code class="hljs css language-go"><span class="hljs-keyword">import</span> (
    <span class="hljs-string">"context"</span>
    <span class="hljs-string">"fmt"</span>
    <span class="hljs-string">"log"</span>

    <span class="hljs-string">"github.com/apache/pulsar-client-go/pulsar"</span>
)

<span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-title">main</span><span class="hljs-params">()</span></span> {
    client, err := pulsar.NewClient(pulsar.ClientOptions{URL: <span class="hljs-string">"pulsar://localhost:6650"</span>})
    <span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
        log.Fatal(err)
    }

    <span class="hljs-keyword">defer</span> client.Close()

    reader, err := client.CreateReader(pulsar.ReaderOptions{
        Topic:          <span class="hljs-string">"topic-1"</span>,
        StartMessageID: pulsar.EarliestMessageID(),
    })
    <span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
        log.Fatal(err)
    }
    <span class="hljs-keyword">defer</span> reader.Close()

    <span class="hljs-keyword">for</span> reader.HasNext() {
        msg, err := reader.Next(context.Background())
        <span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
            log.Fatal(err)
        }

        fmt.Printf(<span class="hljs-string">"Received message msgId: %#v -- content: '%s'\n"</span>,
            msg.ID(), <span class="hljs-keyword">string</span>(msg.Payload()))
    }
}
</code></pre>
<p>In the example above, the reader begins reading from the earliest available message (specified by <code>pulsar.EarliestMessage</code>). The reader can also begin reading from the latest message (<code>pulsar.LatestMessage</code>) or some other message ID specified by bytes using the <code>DeserializeMessageID</code> function, which takes a byte array and returns a <code>MessageID</code> object. Here's an example:</p>
<pre><code class="hljs css language-go">lastSavedId := <span class="hljs-comment">// Read last saved message id from external store as byte[]</span>

reader, err := client.CreateReader(pulsar.ReaderOptions{
    Topic:          <span class="hljs-string">"my-golang-topic"</span>,
    StartMessageID: pulsar.DeserializeMessageID(lastSavedId),
})
</code></pre>
<h4><a class="anchor" aria-hidden="true" id="how-to-use-reader-to-read-specific-message"></a><a href="#how-to-use-reader-to-read-specific-message" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>How to use reader to read specific message</h4>
<pre><code class="hljs css language-go">client, err := NewClient(pulsar.ClientOptions{
    URL: lookupURL,
})

<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
    log.Fatal(err)
}
<span class="hljs-keyword">defer</span> client.Close()

topic := <span class="hljs-string">"topic-1"</span>
ctx := context.Background()

<span class="hljs-comment">// create producer</span>
producer, err := client.CreateProducer(pulsar.ProducerOptions{
    Topic:           topic,
    DisableBatching: <span class="hljs-literal">true</span>,
})
<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
    log.Fatal(err)
}
<span class="hljs-keyword">defer</span> producer.Close()

<span class="hljs-comment">// send 10 messages</span>
msgIDs := [<span class="hljs-number">10</span>]MessageID{}
<span class="hljs-keyword">for</span> i := <span class="hljs-number">0</span>; i &lt; <span class="hljs-number">10</span>; i++ {
    msgID, err := producer.Send(ctx, &amp;pulsar.ProducerMessage{
        Payload: []<span class="hljs-keyword">byte</span>(fmt.Sprintf(<span class="hljs-string">"hello-%d"</span>, i)),
    })
    assert.NoError(t, err)
    assert.NotNil(t, msgID)
    msgIDs[i] = msgID
}

<span class="hljs-comment">// create reader on 5th message (not included)</span>
reader, err := client.CreateReader(pulsar.ReaderOptions{
    Topic:          topic,
    StartMessageID: msgIDs[<span class="hljs-number">4</span>],
})

<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
    log.Fatal(err)
}
<span class="hljs-keyword">defer</span> reader.Close()

<span class="hljs-comment">// receive the remaining 5 messages</span>
<span class="hljs-keyword">for</span> i := <span class="hljs-number">5</span>; i &lt; <span class="hljs-number">10</span>; i++ {
    msg, err := reader.Next(context.Background())
    <span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
    log.Fatal(err)
}

<span class="hljs-comment">// create reader on 5th message (included)</span>
readerInclusive, err := client.CreateReader(pulsar.ReaderOptions{
    Topic:                   topic,
    StartMessageID:          msgIDs[<span class="hljs-number">4</span>],
    StartMessageIDInclusive: <span class="hljs-literal">true</span>,
})

<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
    log.Fatal(err)
}
<span class="hljs-keyword">defer</span> readerInclusive.Close()
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="reader-configuration"></a><a href="#reader-configuration" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Reader configuration</h3>
<table>
<thead>
<tr><th style="text-align:left">Name</th><th style="text-align:left">Description</th><th style="text-align:left">Default</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left">Topic</td><td style="text-align:left">Topic specify the topic this consumer will subscribe to. This argument is required when constructing the reader.</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left">Name</td><td style="text-align:left">Name set the reader name.</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left">Properties</td><td style="text-align:left">Attach a set of application defined properties to the reader. This properties will be visible in the topic stats</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left">StartMessageID</td><td style="text-align:left">StartMessageID initial reader positioning is done by specifying a message id.</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left">StartMessageIDInclusive</td><td style="text-align:left">If true, the reader will start at the <code>StartMessageID</code>, included. Default is <code>false</code> and the reader will start from the &quot;next&quot; message</td><td style="text-align:left">false</td></tr>
<tr><td style="text-align:left">MessageChannel</td><td style="text-align:left">MessageChannel sets a <code>MessageChannel</code> for the consumer When a message is received, it will be pushed to the channel for consumption</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left">ReceiverQueueSize</td><td style="text-align:left">ReceiverQueueSize sets the size of the consumer receive queue.</td><td style="text-align:left">1000</td></tr>
<tr><td style="text-align:left">SubscriptionRolePrefix</td><td style="text-align:left">SubscriptionRolePrefix set the subscription role prefix.</td><td style="text-align:left">“reader”</td></tr>
<tr><td style="text-align:left">ReadCompacted</td><td style="text-align:left">If enabled, the reader will read messages from the compacted topic rather than reading the full message backlog of the topic.  ReadCompacted can only be enabled when reading from a persistent topic.</td><td style="text-align:left">false</td></tr>
</tbody>
</table>
<h2><a class="anchor" aria-hidden="true" id="messages"></a><a href="#messages" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Messages</h2>
<p>The Pulsar Go client provides a <code>ProducerMessage</code> interface that you can use to construct messages to producer on Pulsar topics. Here's an example message:</p>
<pre><code class="hljs css language-go">msg := pulsar.ProducerMessage{
    Payload: []<span class="hljs-keyword">byte</span>(<span class="hljs-string">"Here is some message data"</span>),
    Key: <span class="hljs-string">"message-key"</span>,
    Properties: <span class="hljs-keyword">map</span>[<span class="hljs-keyword">string</span>]<span class="hljs-keyword">string</span>{
        <span class="hljs-string">"foo"</span>: <span class="hljs-string">"bar"</span>,
    },
    EventTime: time.Now(),
    ReplicationClusters: []<span class="hljs-keyword">string</span>{<span class="hljs-string">"cluster1"</span>, <span class="hljs-string">"cluster3"</span>},
}

<span class="hljs-keyword">if</span> _, err := producer.send(msg); err != <span class="hljs-literal">nil</span> {
    log.Fatalf(<span class="hljs-string">"Could not publish message due to: %v"</span>, err)
}
</code></pre>
<p>The following methods parameters are available for <code>ProducerMessage</code> objects:</p>
<table>
<thead>
<tr><th style="text-align:left">Parameter</th><th style="text-align:left">Description</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left"><code>Payload</code></td><td style="text-align:left">The actual data payload of the message</td></tr>
<tr><td style="text-align:left"><code>Value</code></td><td style="text-align:left">Value and payload is mutually exclusive, <code>Value interface{}</code> for schema message.</td></tr>
<tr><td style="text-align:left"><code>Key</code></td><td style="text-align:left">The optional key associated with the message (particularly useful for things like topic compaction)</td></tr>
<tr><td style="text-align:left"><code>OrderingKey</code></td><td style="text-align:left">OrderingKey sets the ordering key of the message.</td></tr>
<tr><td style="text-align:left"><code>Properties</code></td><td style="text-align:left">A key-value map (both keys and values must be strings) for any application-specific metadata attached to the message</td></tr>
<tr><td style="text-align:left"><code>EventTime</code></td><td style="text-align:left">The timestamp associated with the message</td></tr>
<tr><td style="text-align:left"><code>ReplicationClusters</code></td><td style="text-align:left">The clusters to which this message will be replicated. Pulsar brokers handle message replication automatically; you should only change this setting if you want to override the broker default.</td></tr>
<tr><td style="text-align:left"><code>SequenceID</code></td><td style="text-align:left">Set the sequence id to assign to the current message</td></tr>
<tr><td style="text-align:left"><code>DeliverAfter</code></td><td style="text-align:left">Request to deliver the message only after the specified relative delay</td></tr>
<tr><td style="text-align:left"><code>DeliverAt</code></td><td style="text-align:left">Deliver the message only at or after the specified absolute timestamp</td></tr>
</tbody>
</table>
<h2><a class="anchor" aria-hidden="true" id="tls-encryption-and-authentication"></a><a href="#tls-encryption-and-authentication" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>TLS encryption and authentication</h2>
<p>In order to use <a href="/docs/en/security-tls-transport">TLS encryption</a>, you'll need to configure your client to do so:</p>
<ul>
<li>Use <code>pulsar+ssl</code> URL type</li>
<li>Set <code>TLSTrustCertsFilePath</code> to the path to the TLS certs used by your client and the Pulsar broker</li>
<li>Configure <code>Authentication</code> option</li>
</ul>
<p>Here's an example:</p>
<pre><code class="hljs css language-go">opts := pulsar.ClientOptions{
    URL: <span class="hljs-string">"pulsar+ssl://my-cluster.com:6651"</span>,
    TLSTrustCertsFilePath: <span class="hljs-string">"/path/to/certs/my-cert.csr"</span>,
    Authentication: NewAuthenticationTLS(<span class="hljs-string">"my-cert.pem"</span>, <span class="hljs-string">"my-key.pem"</span>),
}
</code></pre>
<h2><a class="anchor" aria-hidden="true" id="oauth2-authentication"></a><a href="#oauth2-authentication" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>OAuth2 authentication</h2>
<p>To use <a href="/docs/en/security-oauth2">OAuth2 authentication</a>, you'll need to configure your client to perform the following operations.
This example shows how to configure OAuth2 authentication.</p>
<pre><code class="hljs css language-go">oauth := pulsar.NewAuthenticationOAuth2(<span class="hljs-keyword">map</span>[<span class="hljs-keyword">string</span>]<span class="hljs-keyword">string</span>{
        <span class="hljs-string">"type"</span>:       <span class="hljs-string">"client_credentials"</span>,
        <span class="hljs-string">"issuerUrl"</span>:  <span class="hljs-string">"https://dev-kt-aa9ne.us.auth0.com"</span>,
        <span class="hljs-string">"audience"</span>:   <span class="hljs-string">"https://dev-kt-aa9ne.us.auth0.com/api/v2/"</span>,
        <span class="hljs-string">"privateKey"</span>: <span class="hljs-string">"/path/to/privateKey"</span>,
        <span class="hljs-string">"clientId"</span>:   <span class="hljs-string">"0Xx...Yyxeny"</span>,
    })
client, err := pulsar.NewClient(pulsar.ClientOptions{
        URL:              <span class="hljs-string">"pulsar://my-cluster:6650"</span>,
        Authentication:   oauth,
})
</code></pre>
</span></div></article></div><div class="docs-prevnext"><a class="docs-prev button" href="/docs/en/client-libraries-java"><span class="arrow-prev">← </span><span>Java</span></a><a class="docs-next button" href="/docs/en/client-libraries-python"><span>Python</span><span class="arrow-next"> →</span></a></div></div></div><nav class="onPageNav"><ul class="toc-headings"><li><a href="#installation">Installation</a><ul class="toc-headings"><li><a href="#install-go-package">Install go package</a></li></ul></li><li><a href="#connection-urls">Connection URLs</a></li><li><a href="#create-a-client">Create a client</a></li><li><a href="#producers">Producers</a><ul class="toc-headings"><li><a href="#producer-operations">Producer operations</a></li><li><a href="#producer-example">Producer Example</a></li><li><a href="#producer-configuration">Producer configuration</a></li></ul></li><li><a href="#consumers">Consumers</a><ul class="toc-headings"><li><a href="#consumer-operations">Consumer operations</a></li><li><a href="#receive-example">Receive example</a></li><li><a href="#consumer-configuration">Consumer configuration</a></li></ul></li><li><a href="#readers">Readers</a><ul class="toc-headings"><li><a href="#reader-operations">Reader operations</a></li><li><a href="#reader-example">Reader example</a></li><li><a href="#reader-configuration">Reader configuration</a></li></ul></li><li><a href="#messages">Messages</a></li><li><a href="#tls-encryption-and-authentication">TLS encryption and authentication</a></li><li><a href="#oauth2-authentication">OAuth2 authentication</a></li></ul></nav></div><footer class="nav-footer" id="footer"><section class="copyright">Copyright © 2022 The Apache Software Foundation. All Rights Reserved. Apache, Apache Pulsar and the Apache feather logo are trademarks of The Apache Software Foundation.</section><span><script>
      const community = document.querySelector("a[href='#community']").parentNode;
      const communityMenu =
        '<li>' +
        '<a id="community-menu" href="#">Community <span style="font-size: 0.75em">&nbsp;▼</span></a>' +
        '<div id="community-dropdown" class="hide">' +
          '<ul id="community-dropdown-items">' +
            '<li><a href="/en/contact">Contact</a></li>' +
            '<li><a href="/en/contributing">Contributing</a></li>' +
            '<li><a href="/en/coding-guide">Coding guide</a></li>' +
            '<li><a href="/en/events">Events</a></li>' +
            '<li><a href="https://twitter.com/Apache_Pulsar" target="_blank">Twitter &#x2750</a></li>' +
            '<li><a href="https://github.com/apache/pulsar/wiki" target="_blank">Wiki &#x2750</a></li>' +
            '<li><a href="https://github.com/apache/pulsar/issues" target="_blank">Issue tracking &#x2750</a></li>' +
            '<li><a href="https://pulsar-summit.org/" target="_blank">Pulsar Summit &#x2750</a></li>' +
            '<li>&nbsp;</li>' +
            '<li><a href="/en/resources">Resources</a></li>' +
            '<li><a href="/en/team">Team</a></li>' +
            '<li><a href="/en/powered-by">Powered By</a></li>' +
          '</ul>' +
        '</div>' +
        '</li>';

      community.innerHTML = communityMenu;

      const communityMenuItem = document.getElementById("community-menu");
      const communityDropDown = document.getElementById("community-dropdown");
      communityMenuItem.addEventListener("click", function(event) {
        event.preventDefault();

        if (communityDropDown.className == 'hide') {
          communityDropDown.className = 'visible';
        } else {
          communityDropDown.className = 'hide';
        }
      });
    </script></span></footer></div><script>window.twttr=(function(d,s, id){var js,fjs=d.getElementsByTagName(s)[0],t=window.twttr||{};if(d.getElementById(id))return t;js=d.createElement(s);js.id=id;js.src='https://platform.twitter.com/widgets.js';fjs.parentNode.insertBefore(js, fjs);t._e = [];t.ready = function(f) {t._e.push(f);};return t;}(document, 'script', 'twitter-wjs'));</script></body></html>