blob: a7f6c2aba864412971cac18275bfa66db7d4d6f6 [file] [log] [blame]
<!DOCTYPE html><html lang="en"><head><meta charSet="utf-8"/><meta http-equiv="X-UA-Compatible" content="IE=edge"/><title>Pulsar adaptor for Apache Storm · Apache Pulsar</title><meta name="viewport" content="width=device-width, initial-scale=1.0"/><meta name="generator" content="Docusaurus"/><meta name="description" content="Pulsar Storm is an adaptor for integrating with [Apache Storm](http://storm.apache.org/) topologies. It provides core Storm implementations for sending and receiving data."/><meta name="docsearch:version" content="2.1.0-incubating"/><meta name="docsearch:language" content="en"/><meta property="og:title" content="Pulsar adaptor for Apache Storm · Apache Pulsar"/><meta property="og:type" content="website"/><meta property="og:url" content="https://pulsar.apache.org/"/><meta property="og:description" content="Pulsar Storm is an adaptor for integrating with [Apache Storm](http://storm.apache.org/) topologies. It provides core Storm implementations for sending and receiving data."/><meta name="twitter:card" content="summary"/><meta name="twitter:image" content="https://pulsar.apache.org/img/pulsar.svg"/><link rel="shortcut icon" href="/img/pulsar.ico"/><link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/atom-one-dark.min.css"/><link rel="alternate" type="application/atom+xml" href="https://pulsar.apache.org/blog/atom.xml" title="Apache Pulsar Blog ATOM Feed"/><link rel="alternate" type="application/rss+xml" href="https://pulsar.apache.org/blog/feed.xml" title="Apache Pulsar Blog RSS Feed"/><link rel="stylesheet" href="/css/code-blocks-buttons.css"/><script type="text/javascript" src="https://buttons.github.io/buttons.js"></script><script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/clipboard.js/2.0.0/clipboard.min.js"></script><script type="text/javascript" src="/js/custom.js"></script><script src="/js/scrollSpy.js"></script><link rel="stylesheet" href="/css/main.css"/><script src="/js/codetabs.js"></script></head><body class="sideNavVisible separateOnPageNav"><div class="fixedHeaderContainer"><div class="headerWrapper wrapper"><header><a href="/en"><img class="logo" src="/img/pulsar.svg" alt="Apache Pulsar"/></a><a href="/en/versions"><h3>2.1.0-incubating</h3></a><div class="navigationWrapper navigationSlider"><nav class="slidingNav"><ul class="nav-site nav-site-internal"><li class="siteNavGroupActive"><a href="/docs/en/2.1.0-incubating/getting-started-standalone" target="_self">Docs</a></li><li class=""><a href="/en/download" target="_self">Download</a></li><li class="siteNavGroupActive"><a href="/docs/en/2.1.0-incubating/client-libraries" target="_self">Clients</a></li><li class=""><a href="#restapis" target="_self">REST APIs</a></li><li class=""><a href="#cli" target="_self">Cli</a></li><li class=""><a href="/blog/" target="_self">Blog</a></li><li class=""><a href="#community" target="_self">Community</a></li><li class=""><a href="#apache" target="_self">Apache</a></li><li class=""><a href="https://pulsar-next.staged.apache.org/" target="_self">New Website (Beta)</a></li><span><li><a id="languages-menu" href="#"><img class="languages-icon" src="/img/language.svg" alt="Languages icon"/>English</a><div id="languages-dropdown" class="hide"><ul id="languages-dropdown-items"><li><a href="/docs/ja/2.1.0-incubating/adaptors-storm">日本語</a></li><li><a href="/docs/fr/2.1.0-incubating/adaptors-storm">Français</a></li><li><a href="/docs/ko/2.1.0-incubating/adaptors-storm">한국어</a></li><li><a href="/docs/zh-CN/2.1.0-incubating/adaptors-storm">中文</a></li><li><a href="/docs/zh-TW/2.1.0-incubating/adaptors-storm">繁體中文</a></li><li><a href="https://crowdin.com/project/apache-pulsar" target="_blank" rel="noreferrer noopener">Help Translate</a></li></ul></div></li><script>
const languagesMenuItem = document.getElementById("languages-menu");
const languagesDropDown = document.getElementById("languages-dropdown");
languagesMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (languagesDropDown.className == "hide") {
languagesDropDown.className = "visible";
} else {
languagesDropDown.className = "hide";
}
});
</script></span></ul></nav></div></header></div></div><div class="navPusher"><div class="docMainWrapper wrapper"><div class="docsNavContainer" id="docsNav"><nav class="toc"><div class="toggleNav"><section class="navWrapper wrapper"><div class="navBreadcrumb wrapper"><div class="navToggle" id="navToggler"><div class="hamburger-menu"><div class="line1"></div><div class="line2"></div><div class="line3"></div></div></div><h2><i></i><span>Adaptors</span></h2><div class="tocToggler" id="tocToggler"><i class="icon-toc"></i></div></div><div class="navGroups"><div class="navGroup"><h3 class="navGroupCategoryTitle">Getting Started</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/pulsar-2.0">Pulsar 2.0</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/getting-started-standalone">Run Pulsar locally</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/getting-started-docker">Pulsar in Docker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/client-libraries">Client libraries</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Concepts and Architecture</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/concepts-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/concepts-messaging">Messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/concepts-architecture-overview">Architecture</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/concepts-clients">Clients</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/concepts-replication">Geo Replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/concepts-multi-tenancy">Multi Tenancy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/concepts-authentication">Authentication and Authorization</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/concepts-topic-compaction">Topic Compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/concepts-tiered-storage">Tiered Storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/concepts-schema-registry">Schema Registry</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar Functions</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/functions-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/functions-quickstart">Getting started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/functions-api">API</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/functions-deploying">Deploying functions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/functions-guarantees">Processing guarantees</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/functions-state">State Storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/functions-metrics">Metrics</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar IO</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/io-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/io-quickstart">Getting started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/io-managing">Managing Connectors</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/io-connectors">Builtin Connectors</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/io-develop">Developing Connectors</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Deployment</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/deploy-aws">Amazon Web Services</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/deploy-kubernetes">Kubernetes</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/deploy-bare-metal">Bare metal</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/deploy-bare-metal-multi-cluster">Bare metal multi-cluster</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/deploy-monitoring">Monitoring</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Administration</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/administration-zk-bk">ZooKeeper and BookKeeper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/administration-geo">Geo-replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/administration-dashboard">Dashboard</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/administration-stats">Pulsar statistics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/administration-load-distribution">Load distribution</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/administration-proxy">Pulsar proxy</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Security</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/security-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/security-tls-transport">Transport Encryption using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/security-tls-authentication">Authentication using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/security-athenz">Authentication using Athenz</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/security-authorization">Authorization and ACLs</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/security-encryption">End-to-End Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/security-extending">Extending</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Client Libraries</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/client-libraries-java">Java</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/client-libraries-go">Go</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/client-libraries-python">Python</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/client-libraries-cpp">C++</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/client-libraries-websocket">WebSocket</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Admin API</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/admin-api-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/admin-api-clusters">Clusters</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/admin-api-tenants">Tenants</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/admin-api-brokers">Brokers</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/admin-api-namespaces">Namespaces</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/admin-api-permissions">Permissions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/admin-api-persistent-topics">Persistent topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/admin-api-non-persistent-topics">Non-Persistent topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/admin-api-partitioned-topics">Partitioned topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/admin-api-schemas">Schemas</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Adaptors</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/adaptors-kafka">Kafka client wrapper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/adaptors-spark">Apache Spark</a></li><li class="navListItem navListItemActive"><a class="navItem" href="/docs/en/2.1.0-incubating/adaptors-storm">Apache Storm</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Cookbooks</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/cookbooks-tiered-storage">Tiered Storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/cookbooks-compaction">Topic compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/cookbooks-deduplication">Message deduplication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/cookbooks-non-persistent">Non-persistent messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/cookbooks-partitioned">Partitioned Topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/cookbooks-retention-expiry">Message retention and expiry</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/cookbooks-encryption">Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/cookbooks-message-queue">Message queue</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Development</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/develop-tools">Simulation tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/developing-binary-protocol">Binary protocol</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/develop-schema">Custom schema storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/develop-load-manager">Modular load manager</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/develop-cpp">Building Pulsar C++ client</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Reference</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/reference-terminology">Terminology</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/reference-cli-tools">Pulsar CLI tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/reference-configuration">Pulsar configuration</a></li></ul></div></div></section></div><script>
var coll = document.getElementsByClassName('collapsible');
var checkActiveCategory = true;
for (var i = 0; i < coll.length; i++) {
var links = coll[i].nextElementSibling.getElementsByTagName('*');
if (checkActiveCategory){
for (var j = 0; j < links.length; j++) {
if (links[j].classList.contains('navListItemActive')){
coll[i].nextElementSibling.classList.toggle('hide');
coll[i].childNodes[1].classList.toggle('rotate');
checkActiveCategory = false;
break;
}
}
}
coll[i].addEventListener('click', function() {
var arrow = this.childNodes[1];
arrow.classList.toggle('rotate');
var content = this.nextElementSibling;
content.classList.toggle('hide');
});
}
document.addEventListener('DOMContentLoaded', function() {
createToggler('#navToggler', '#docsNav', 'docsSliderActive');
createToggler('#tocToggler', 'body', 'tocActive');
var headings = document.querySelector('.toc-headings');
headings && headings.addEventListener('click', function(event) {
var el = event.target;
while(el !== headings){
if (el.tagName === 'A') {
document.body.classList.remove('tocActive');
break;
} else{
el = el.parentNode;
}
}
}, false);
function createToggler(togglerSelector, targetSelector, className) {
var toggler = document.querySelector(togglerSelector);
var target = document.querySelector(targetSelector);
if (!toggler) {
return;
}
toggler.onclick = function(event) {
event.preventDefault();
target.classList.toggle(className);
};
}
});
</script></nav></div><div class="container mainContainer docsContainer"><div class="wrapper"><div class="post"><header class="postHeader"><a class="edit-page-link button" href="https://github.com/apache/pulsar/edit/master/site2/docs/adaptors-storm.md" target="_blank" rel="noreferrer noopener">Edit</a><h1 id="__docusaurus" class="postHeaderTitle">Pulsar adaptor for Apache Storm</h1></header><article><div><span><p>Pulsar Storm is an adaptor for integrating with <a href="http://storm.apache.org/">Apache Storm</a> topologies. It provides core Storm implementations for sending and receiving data.</p>
<p>An application can inject data into a Storm topology via a generic Pulsar spout, as well as consume data from a Storm topology via a generic Pulsar bolt.</p>
<h2><a class="anchor" aria-hidden="true" id="using-the-pulsar-storm-adaptor"></a><a href="#using-the-pulsar-storm-adaptor" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Using the Pulsar Storm Adaptor</h2>
<p>Include dependency for Pulsar Storm Adaptor:</p>
<pre><code class="hljs css language-xml"><span class="hljs-tag">&lt;<span class="hljs-name">dependency</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">groupId</span>&gt;</span>org.apache.pulsar<span class="hljs-tag">&lt;/<span class="hljs-name">groupId</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">artifactId</span>&gt;</span>pulsar-storm<span class="hljs-tag">&lt;/<span class="hljs-name">artifactId</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">version</span>&gt;</span>${pulsar.version}<span class="hljs-tag">&lt;/<span class="hljs-name">version</span>&gt;</span>
<span class="hljs-tag">&lt;/<span class="hljs-name">dependency</span>&gt;</span>
</code></pre>
<h2><a class="anchor" aria-hidden="true" id="pulsar-spout"></a><a href="#pulsar-spout" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Pulsar Spout</h2>
<p>The Pulsar Spout allows for the data published on a topic to be consumed by a Storm topology. It emits a Storm tuple based on the message received and the <code>MessageToValuesMapper</code> provided by the client.</p>
<p>The tuples that fail to be processed by the downstream bolts will be re-injected by the spout with an exponential backoff, within a configurable timeout (the default is 60 seconds) or a configurable number of retries, whichever comes first, after which it is acknowledged by the consumer. Here's an example construction of a spout:</p>
<pre><code class="hljs css language-java"><span class="hljs-comment">// Configure a Pulsar Client</span>
ClientConfiguration clientConf = <span class="hljs-keyword">new</span> ClientConfiguration();
<span class="hljs-comment">// Configure a Pulsar Consumer</span>
ConsumerConfiguration consumerConf = <span class="hljs-keyword">new</span> ConsumerConfiguration();
<span class="hljs-meta">@SuppressWarnings</span>(<span class="hljs-string">"serial"</span>)
MessageToValuesMapper messageToValuesMapper = <span class="hljs-keyword">new</span> MessageToValuesMapper() {
<span class="hljs-meta">@Override</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> Values <span class="hljs-title">toValues</span><span class="hljs-params">(Message msg)</span> </span>{
<span class="hljs-keyword">return</span> <span class="hljs-keyword">new</span> Values(<span class="hljs-keyword">new</span> String(msg.getData()));
}
<span class="hljs-meta">@Override</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">declareOutputFields</span><span class="hljs-params">(OutputFieldsDeclarer declarer)</span> </span>{
<span class="hljs-comment">// declare the output fields</span>
declarer.declare(<span class="hljs-keyword">new</span> Fields(<span class="hljs-string">"string"</span>));
}
};
<span class="hljs-comment">// Configure a Pulsar Spout</span>
PulsarSpoutConfiguration spoutConf = <span class="hljs-keyword">new</span> PulsarSpoutConfiguration();
spoutConf.setServiceUrl(<span class="hljs-string">"pulsar://broker.messaging.usw.example.com:6650"</span>);
spoutConf.setTopic(<span class="hljs-string">"persistent://my-property/usw/my-ns/my-topic1"</span>);
spoutConf.setSubscriptionName(<span class="hljs-string">"my-subscriber-name1"</span>);
spoutConf.setMessageToValuesMapper(messageToValuesMapper);
<span class="hljs-comment">// Create a Pulsar Spout</span>
PulsarSpout spout = <span class="hljs-keyword">new</span> PulsarSpout(spoutConf, clientConf, consumerConf);
</code></pre>
<h2><a class="anchor" aria-hidden="true" id="pulsar-bolt"></a><a href="#pulsar-bolt" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Pulsar Bolt</h2>
<p>The Pulsar bolt allows data in a Storm topology to be published on a topic. It publishes messages based on the Storm tuple received and the <code>TupleToMessageMapper</code> provided by the client.</p>
<p>A partitioned topic can also be used to publish messages on different topics. In the implementation of the <code>TupleToMessageMapper</code>, a &quot;key&quot; will need to be provided in the message which will send the messages with the same key to the same topic. Here's an example bolt:</p>
<pre><code class="hljs css language-java"><span class="hljs-comment">// Configure a Pulsar Client</span>
ClientConfiguration clientConf = <span class="hljs-keyword">new</span> ClientConfiguration();
<span class="hljs-comment">// Configure a Pulsar Producer </span>
ProducerConfiguration producerConf = <span class="hljs-keyword">new</span> ProducerConfiguration();
<span class="hljs-meta">@SuppressWarnings</span>(<span class="hljs-string">"serial"</span>)
TupleToMessageMapper tupleToMessageMapper = <span class="hljs-keyword">new</span> TupleToMessageMapper() {
<span class="hljs-meta">@Override</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> Message <span class="hljs-title">toMessage</span><span class="hljs-params">(Tuple tuple)</span> </span>{
String receivedMessage = tuple.getString(<span class="hljs-number">0</span>);
<span class="hljs-comment">// message processing</span>
String processedMsg = receivedMessage + <span class="hljs-string">"-processed"</span>;
<span class="hljs-keyword">return</span> MessageBuilder.create().setContent(processedMsg.getBytes()).build();
}
<span class="hljs-meta">@Override</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">declareOutputFields</span><span class="hljs-params">(OutputFieldsDeclarer declarer)</span> </span>{
<span class="hljs-comment">// declare the output fields</span>
}
};
<span class="hljs-comment">// Configure a Pulsar Bolt</span>
PulsarBoltConfiguration boltConf = <span class="hljs-keyword">new</span> PulsarBoltConfiguration();
boltConf.setServiceUrl(<span class="hljs-string">"pulsar://broker.messaging.usw.example.com:6650"</span>);
boltConf.setTopic(<span class="hljs-string">"persistent://my-property/usw/my-ns/my-topic2"</span>);
boltConf.setTupleToMessageMapper(tupleToMessageMapper);
<span class="hljs-comment">// Create a Pulsar Bolt</span>
PulsarBolt bolt = <span class="hljs-keyword">new</span> PulsarBolt(boltConf, clientConf);
</code></pre>
<h2><a class="anchor" aria-hidden="true" id="example"></a><a href="#example" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Example</h2>
<p>You can find a complete example <a href="https://github.com/apache/incubator-pulsar/tree/master/pulsar-storm/src/test/java/org/apache/pulsar/storm/example/StormExample.java">here</a>.</p>
</span></div></article></div><div class="docs-prevnext"><a class="docs-prev button" href="/docs/en/2.1.0-incubating/adaptors-spark"><span class="arrow-prev"></span><span>Apache Spark</span></a><a class="docs-next button" href="/docs/en/2.1.0-incubating/cookbooks-tiered-storage"><span>Tiered Storage</span><span class="arrow-next"></span></a></div></div></div><nav class="onPageNav"><ul class="toc-headings"><li><a href="#using-the-pulsar-storm-adaptor">Using the Pulsar Storm Adaptor</a></li><li><a href="#pulsar-spout">Pulsar Spout</a></li><li><a href="#pulsar-bolt">Pulsar Bolt</a></li><li><a href="#example">Example</a></li></ul></nav></div><footer class="nav-footer" id="footer"><section class="copyright">Copyright © 2022 The Apache Software Foundation. All Rights Reserved. Apache, Apache Pulsar and the Apache feather logo are trademarks of The Apache Software Foundation.</section><span><script>
const community = document.querySelector("a[href='#community']").parentNode;
const communityMenu =
'<li>' +
'<a id="community-menu" href="#">Community <span style="font-size: 0.75em">&nbsp;▼</span></a>' +
'<div id="community-dropdown" class="hide">' +
'<ul id="community-dropdown-items">' +
'<li><a href="/en/contact">Contact</a></li>' +
'<li><a href="/en/contributing">Contributing</a></li>' +
'<li><a href="/en/coding-guide">Coding guide</a></li>' +
'<li><a href="/en/events">Events</a></li>' +
'<li><a href="https://twitter.com/Apache_Pulsar" target="_blank">Twitter &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/wiki" target="_blank">Wiki &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/issues" target="_blank">Issue tracking &#x2750</a></li>' +
'<li><a href="https://pulsar-summit.org/" target="_blank">Pulsar Summit &#x2750</a></li>' +
'<li>&nbsp;</li>' +
'<li><a href="/en/resources">Resources</a></li>' +
'<li><a href="/en/team">Team</a></li>' +
'<li><a href="/en/powered-by">Powered By</a></li>' +
'</ul>' +
'</div>' +
'</li>';
community.innerHTML = communityMenu;
const communityMenuItem = document.getElementById("community-menu");
const communityDropDown = document.getElementById("community-dropdown");
communityMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (communityDropDown.className == 'hide') {
communityDropDown.className = 'visible';
} else {
communityDropDown.className = 'hide';
}
});
</script></span></footer></div><script>window.twttr=(function(d,s, id){var js,fjs=d.getElementsByTagName(s)[0],t=window.twttr||{};if(d.getElementById(id))return t;js=d.createElement(s);js.id=id;js.src='https://platform.twitter.com/widgets.js';fjs.parentNode.insertBefore(js, fjs);t._e = [];t.ready = function(f) {t._e.push(f);};return t;}(document, 'script', 'twitter-wjs'));</script></body></html>