| <!DOCTYPE html> |
| <html> |
| <head> |
| <meta charset="utf-8"> |
| <meta http-equiv="X-UA-Compatible" content="IE=edge"> |
| <meta name="viewport" content="width=device-width, initial-scale=1"> |
| |
| <link rel="shortcut icon" href="/favicon.ico" type="image/x-icon"> |
| <link rel="icon" href="/favicon.ico" type="image/x-icon"> |
| |
| <title>Storm Redis Integration</title> |
| |
| <!-- Bootstrap core CSS --> |
| <link href="/assets/css/bootstrap.min.css" rel="stylesheet"> |
| <!-- Bootstrap theme --> |
| <link href="/assets/css/bootstrap-theme.min.css" rel="stylesheet"> |
| |
| <!-- Custom styles for this template --> |
| <link href="/css/font-awesome.min.css" rel="stylesheet"> |
| <link href="/css/style.css" rel="stylesheet"> |
| <link href="/assets/css/owl.theme.css" rel="stylesheet"> |
| <link href="/assets/css/owl.carousel.css" rel="stylesheet"> |
| <script type="text/javascript" src="/assets/js/jquery.min.js"></script> |
| <script type="text/javascript" src="/assets/js/bootstrap.min.js"></script> |
| <script type="text/javascript" src="/assets/js/owl.carousel.min.js"></script> |
| <script type="text/javascript" src="/assets/js/storm.js"></script> |
| </head> |
| |
| |
| <body> |
| <header> |
| <div class="container-fluid"> |
| <div class="row"> |
| <div class="col-md-5"> |
| <a href="/index.html"><img src="/images/logo.png" class="logo"/></a> |
| </div> |
| <div class="col-md-5"> |
| |
| <h1>Version: 2.0.0</h1> |
| |
| </div> |
| <div class="col-md-2"> |
| <a href="/downloads.html" class="btn-std btn-block btn-download">Download</a> |
| </div> |
| </div> |
| </div> |
| </header> |
| <!--Header End--> |
| <!--Navigation Begin--> |
| <div class="navbar" role="banner"> |
| <div class="container-fluid"> |
| <div class="navbar-header"> |
| <button class="navbar-toggle" type="button" data-toggle="collapse" data-target=".bs-navbar-collapse"> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| </button> |
| </div> |
| <nav class="collapse navbar-collapse bs-navbar-collapse" role="navigation"> |
| <ul class="nav navbar-nav"> |
| <li><a href="/index.html" id="home">Home</a></li> |
| <li><a href="/getting-help.html" id="getting-help">Getting Help</a></li> |
| <li><a href="/about/integrates.html" id="project-info">Project Information</a></li> |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown" id="documentation">Documentation <b |
| class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| |
| |
| <li><a href="/releases/2.6.2/index.html">2.6.2</a></li> |
| |
| |
| |
| <li><a href="/releases/2.6.1/index.html">2.6.1</a></li> |
| |
| |
| |
| <li><a href="/releases/2.6.0/index.html">2.6.0</a></li> |
| |
| |
| |
| <li><a href="/releases/2.5.0/index.html">2.5.0</a></li> |
| |
| |
| |
| <li><a href="/releases/2.4.0/index.html">2.4.0</a></li> |
| |
| |
| |
| <li><a href="/releases/2.3.0/index.html">2.3.0</a></li> |
| |
| |
| |
| <li><a href="/releases/2.2.1/index.html">2.2.1</a></li> |
| |
| |
| |
| <li><a href="/releases/2.2.0/index.html">2.2.0</a></li> |
| |
| |
| |
| <li><a href="/releases/2.1.1/index.html">2.1.1</a></li> |
| |
| |
| |
| <li><a href="/releases/2.1.0/index.html">2.1.0</a></li> |
| |
| |
| |
| <li><a href="/releases/2.0.0/index.html">2.0.0</a></li> |
| |
| |
| |
| <li><a href="/releases/1.2.4/index.html">1.2.4</a></li> |
| |
| |
| |
| <li><a href="/releases/1.2.3/index.html">1.2.3</a></li> |
| |
| |
| </ul> |
| </li> |
| <li><a href="/talksAndVideos.html">Talks and Slideshows</a></li> |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown" id="contribute">Community <b |
| class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="/contribute/Contributing-to-Storm.html">Contributing</a></li> |
| <li><a href="/contribute/People.html">People</a></li> |
| <li><a href="/contribute/BYLAWS.html">ByLaws</a></li> |
| <li><a href="/Powered-By.html">PoweredBy</a></li> |
| </ul> |
| </li> |
| <li><a href="/2024/04/05/storm262-released.html" id="news">News</a></li> |
| </ul> |
| </nav> |
| </div> |
| </div> |
| |
| |
| |
| <div class="container-fluid"> |
| <h1 class="page-title">Storm Redis Integration</h1> |
| <div class="row"> |
| <div class="col-md-12"> |
| <!-- Documentation --> |
| |
| <p class="post-meta"></p> |
| |
| <div class="documentation-content"><p>Storm/Trident integration for <a href="http://redis.io/">Redis</a></p> |
| |
| <p>Storm-redis uses Jedis for Redis client.</p> |
| |
| <h2 id="usage">Usage</h2> |
| |
| <h3 id="how-do-i-use-it">How do I use it?</h3> |
| |
| <p>use it as a maven dependency:</p> |
| <div class="highlight"><pre><code class="language-xml" data-lang="xml"><span class="nt"><dependency></span> |
| <span class="nt"><groupId></span>org.apache.storm<span class="nt"></groupId></span> |
| <span class="nt"><artifactId></span>storm-redis<span class="nt"></artifactId></span> |
| <span class="nt"><version></span>${storm.version}<span class="nt"></version></span> |
| <span class="nt"><type></span>jar<span class="nt"></type></span> |
| <span class="nt"></dependency></span> |
| </code></pre></div> |
| <h3 id="for-normal-bolt">For normal Bolt</h3> |
| |
| <p>Storm-redis provides basic Bolt implementations, <code>RedisLookupBolt</code> and <code>RedisStoreBolt</code>, and <code>RedisFilterBolt</code>.</p> |
| |
| <p>As name represents its usage, <code>RedisLookupBolt</code> retrieves value from Redis using key, and <code>RedisStoreBolt</code> stores key / value to Redis, and <code>RedisFilterBolt</code> filters out tuple which key or field doesn't exist on Redis.</p> |
| |
| <p>One tuple will be matched to one key / value pair, and you can define match pattern to <code>TupleMapper</code>.</p> |
| |
| <p>You can also choose data type from <code>RedisDataTypeDescription</code> to use. Please refer <code>RedisDataTypeDescription.RedisDataType</code> to see what data types are supported. In some data types (hash and sorted set, and set if only RedisFilterBolt), it requires additional key and converted key from tuple becomes element.</p> |
| |
| <p>These interfaces are combined with <code>RedisLookupMapper</code> and <code>RedisStoreMapper</code> and <code>RedisFilterMapper</code> which fit <code>RedisLookupBolt</code> and <code>RedisStoreBolt</code>, and <code>RedisFilterBolt</code> respectively. |
| (When you want to implement RedisFilterMapper, be sure to set declareOutputFields() to declare same fields to input stream, since FilterBolt forwards input tuples when they exist on Redis.) </p> |
| |
| <h4 id="redislookupbolt-example">RedisLookupBolt example</h4> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">class</span> <span class="nc">WordCountRedisLookupMapper</span> <span class="kd">implements</span> <span class="nc">RedisLookupMapper</span> <span class="o">{</span> |
| <span class="kd">private</span> <span class="nc">RedisDataTypeDescription</span> <span class="n">description</span><span class="o">;</span> |
| <span class="kd">private</span> <span class="kd">final</span> <span class="nc">String</span> <span class="n">hashKey</span> <span class="o">=</span> <span class="s">"wordCount"</span><span class="o">;</span> |
| |
| <span class="kd">public</span> <span class="nf">WordCountRedisLookupMapper</span><span class="o">()</span> <span class="o">{</span> |
| <span class="n">description</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">RedisDataTypeDescription</span><span class="o">(</span> |
| <span class="nc">RedisDataTypeDescription</span><span class="o">.</span><span class="na">RedisDataType</span><span class="o">.</span><span class="na">HASH</span><span class="o">,</span> <span class="n">hashKey</span><span class="o">);</span> |
| <span class="o">}</span> |
| |
| <span class="nd">@Override</span> |
| <span class="kd">public</span> <span class="nc">List</span><span class="o"><</span><span class="nc">Values</span><span class="o">></span> <span class="nf">toTuple</span><span class="o">(</span><span class="nc">ITuple</span> <span class="n">input</span><span class="o">,</span> <span class="nc">Object</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span> |
| <span class="nc">String</span> <span class="n">member</span> <span class="o">=</span> <span class="n">getKeyFromTuple</span><span class="o">(</span><span class="n">input</span><span class="o">);</span> |
| <span class="nc">List</span><span class="o"><</span><span class="nc">Values</span><span class="o">></span> <span class="n">values</span> <span class="o">=</span> <span class="nc">Lists</span><span class="o">.</span><span class="na">newArrayList</span><span class="o">();</span> |
| <span class="n">values</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="k">new</span> <span class="nc">Values</span><span class="o">(</span><span class="n">member</span><span class="o">,</span> <span class="n">value</span><span class="o">));</span> |
| <span class="k">return</span> <span class="n">values</span><span class="o">;</span> |
| <span class="o">}</span> |
| |
| <span class="nd">@Override</span> |
| <span class="kd">public</span> <span class="kt">void</span> <span class="nf">declareOutputFields</span><span class="o">(</span><span class="nc">OutputFieldsDeclarer</span> <span class="n">declarer</span><span class="o">)</span> <span class="o">{</span> |
| <span class="n">declarer</span><span class="o">.</span><span class="na">declare</span><span class="o">(</span><span class="k">new</span> <span class="nc">Fields</span><span class="o">(</span><span class="s">"wordName"</span><span class="o">,</span> <span class="s">"count"</span><span class="o">));</span> |
| <span class="o">}</span> |
| |
| <span class="nd">@Override</span> |
| <span class="kd">public</span> <span class="nc">RedisDataTypeDescription</span> <span class="nf">getDataTypeDescription</span><span class="o">()</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="n">description</span><span class="o">;</span> |
| <span class="o">}</span> |
| |
| <span class="nd">@Override</span> |
| <span class="kd">public</span> <span class="nc">String</span> <span class="nf">getKeyFromTuple</span><span class="o">(</span><span class="nc">ITuple</span> <span class="n">tuple</span><span class="o">)</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="n">tuple</span><span class="o">.</span><span class="na">getStringByField</span><span class="o">(</span><span class="s">"word"</span><span class="o">);</span> |
| <span class="o">}</span> |
| |
| <span class="nd">@Override</span> |
| <span class="kd">public</span> <span class="nc">String</span> <span class="nf">getValueFromTuple</span><span class="o">(</span><span class="nc">ITuple</span> <span class="n">tuple</span><span class="o">)</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="kc">null</span><span class="o">;</span> |
| <span class="o">}</span> |
| <span class="o">}</span> |
| </code></pre></div><div class="highlight"><pre><code class="language-java" data-lang="java"><span class="nc">JedisPoolConfig</span> <span class="n">poolConfig</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">JedisPoolConfig</span><span class="o">.</span><span class="na">Builder</span><span class="o">()</span> |
| <span class="o">.</span><span class="na">setHost</span><span class="o">(</span><span class="n">host</span><span class="o">).</span><span class="na">setPort</span><span class="o">(</span><span class="n">port</span><span class="o">).</span><span class="na">build</span><span class="o">();</span> |
| <span class="nc">RedisLookupMapper</span> <span class="n">lookupMapper</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">WordCountRedisLookupMapper</span><span class="o">();</span> |
| <span class="nc">RedisLookupBolt</span> <span class="n">lookupBolt</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">RedisLookupBolt</span><span class="o">(</span><span class="n">poolConfig</span><span class="o">,</span> <span class="n">lookupMapper</span><span class="o">);</span> |
| </code></pre></div> |
| <h4 id="redisfilterbolt-example">RedisFilterBolt example</h4> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">class</span> <span class="nc">BlacklistWordFilterMapper</span> <span class="kd">implements</span> <span class="nc">RedisFilterMapper</span> <span class="o">{</span> |
| <span class="kd">private</span> <span class="nc">RedisDataTypeDescription</span> <span class="n">description</span><span class="o">;</span> |
| <span class="kd">private</span> <span class="kd">final</span> <span class="nc">String</span> <span class="n">setKey</span> <span class="o">=</span> <span class="s">"blacklist"</span><span class="o">;</span> |
| |
| <span class="kd">public</span> <span class="nf">BlacklistWordFilterMapper</span><span class="o">()</span> <span class="o">{</span> |
| <span class="n">description</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">RedisDataTypeDescription</span><span class="o">(</span> |
| <span class="nc">RedisDataTypeDescription</span><span class="o">.</span><span class="na">RedisDataType</span><span class="o">.</span><span class="na">SET</span><span class="o">,</span> <span class="n">setKey</span><span class="o">);</span> |
| <span class="o">}</span> |
| |
| <span class="nd">@Override</span> |
| <span class="kd">public</span> <span class="kt">void</span> <span class="nf">declareOutputFields</span><span class="o">(</span><span class="nc">OutputFieldsDeclarer</span> <span class="n">declarer</span><span class="o">)</span> <span class="o">{</span> |
| <span class="n">declarer</span><span class="o">.</span><span class="na">declare</span><span class="o">(</span><span class="k">new</span> <span class="nc">Fields</span><span class="o">(</span><span class="s">"word"</span><span class="o">,</span> <span class="s">"count"</span><span class="o">));</span> |
| <span class="o">}</span> |
| |
| <span class="nd">@Override</span> |
| <span class="kd">public</span> <span class="nc">RedisDataTypeDescription</span> <span class="nf">getDataTypeDescription</span><span class="o">()</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="n">description</span><span class="o">;</span> |
| <span class="o">}</span> |
| |
| <span class="nd">@Override</span> |
| <span class="kd">public</span> <span class="nc">String</span> <span class="nf">getKeyFromTuple</span><span class="o">(</span><span class="nc">ITuple</span> <span class="n">tuple</span><span class="o">)</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="n">tuple</span><span class="o">.</span><span class="na">getStringByField</span><span class="o">(</span><span class="s">"word"</span><span class="o">);</span> |
| <span class="o">}</span> |
| |
| <span class="nd">@Override</span> |
| <span class="kd">public</span> <span class="nc">String</span> <span class="nf">getValueFromTuple</span><span class="o">(</span><span class="nc">ITuple</span> <span class="n">tuple</span><span class="o">)</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="kc">null</span><span class="o">;</span> |
| <span class="o">}</span> |
| <span class="o">}</span> |
| </code></pre></div><div class="highlight"><pre><code class="language-java" data-lang="java"><span class="nc">JedisPoolConfig</span> <span class="n">poolConfig</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">JedisPoolConfig</span><span class="o">.</span><span class="na">Builder</span><span class="o">()</span> |
| <span class="o">.</span><span class="na">setHost</span><span class="o">(</span><span class="n">host</span><span class="o">).</span><span class="na">setPort</span><span class="o">(</span><span class="n">port</span><span class="o">).</span><span class="na">build</span><span class="o">();</span> |
| <span class="nc">RedisFilterMapper</span> <span class="n">filterMapper</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">BlacklistWordFilterMapper</span><span class="o">();</span> |
| <span class="nc">RedisFilterBolt</span> <span class="n">filterBolt</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">RedisFilterBolt</span><span class="o">(</span><span class="n">poolConfig</span><span class="o">,</span> <span class="n">filterMapper</span><span class="o">);</span> |
| </code></pre></div> |
| <h4 id="redisstorebolt-example">RedisStoreBolt example</h4> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">class</span> <span class="nc">WordCountStoreMapper</span> <span class="kd">implements</span> <span class="nc">RedisStoreMapper</span> <span class="o">{</span> |
| <span class="kd">private</span> <span class="nc">RedisDataTypeDescription</span> <span class="n">description</span><span class="o">;</span> |
| <span class="kd">private</span> <span class="kd">final</span> <span class="nc">String</span> <span class="n">hashKey</span> <span class="o">=</span> <span class="s">"wordCount"</span><span class="o">;</span> |
| |
| <span class="kd">public</span> <span class="nf">WordCountStoreMapper</span><span class="o">()</span> <span class="o">{</span> |
| <span class="n">description</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">RedisDataTypeDescription</span><span class="o">(</span> |
| <span class="nc">RedisDataTypeDescription</span><span class="o">.</span><span class="na">RedisDataType</span><span class="o">.</span><span class="na">HASH</span><span class="o">,</span> <span class="n">hashKey</span><span class="o">);</span> |
| <span class="o">}</span> |
| |
| <span class="nd">@Override</span> |
| <span class="kd">public</span> <span class="nc">RedisDataTypeDescription</span> <span class="nf">getDataTypeDescription</span><span class="o">()</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="n">description</span><span class="o">;</span> |
| <span class="o">}</span> |
| |
| <span class="nd">@Override</span> |
| <span class="kd">public</span> <span class="nc">String</span> <span class="nf">getKeyFromTuple</span><span class="o">(</span><span class="nc">ITuple</span> <span class="n">tuple</span><span class="o">)</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="n">tuple</span><span class="o">.</span><span class="na">getStringByField</span><span class="o">(</span><span class="s">"word"</span><span class="o">);</span> |
| <span class="o">}</span> |
| |
| <span class="nd">@Override</span> |
| <span class="kd">public</span> <span class="nc">String</span> <span class="nf">getValueFromTuple</span><span class="o">(</span><span class="nc">ITuple</span> <span class="n">tuple</span><span class="o">)</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="n">tuple</span><span class="o">.</span><span class="na">getStringByField</span><span class="o">(</span><span class="s">"count"</span><span class="o">);</span> |
| <span class="o">}</span> |
| <span class="o">}</span> |
| </code></pre></div><div class="highlight"><pre><code class="language-java" data-lang="java"><span class="nc">JedisPoolConfig</span> <span class="n">poolConfig</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">JedisPoolConfig</span><span class="o">.</span><span class="na">Builder</span><span class="o">()</span> |
| <span class="o">.</span><span class="na">setHost</span><span class="o">(</span><span class="n">host</span><span class="o">).</span><span class="na">setPort</span><span class="o">(</span><span class="n">port</span><span class="o">).</span><span class="na">build</span><span class="o">();</span> |
| <span class="nc">RedisStoreMapper</span> <span class="n">storeMapper</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">WordCountStoreMapper</span><span class="o">();</span> |
| <span class="nc">RedisStoreBolt</span> <span class="n">storeBolt</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">RedisStoreBolt</span><span class="o">(</span><span class="n">poolConfig</span><span class="o">,</span> <span class="n">storeMapper</span><span class="o">);</span> |
| </code></pre></div> |
| <h3 id="for-non-simple-bolt">For non-simple Bolt</h3> |
| |
| <p>If your scenario doesn't fit <code>RedisStoreBolt</code> and <code>RedisLookupBolt</code> and <code>RedisFilterBolt</code>, storm-redis also provides <code>AbstractRedisBolt</code> to let you extend and apply your business logic.</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"> <span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">LookupWordTotalCountBolt</span> <span class="kd">extends</span> <span class="nc">AbstractRedisBolt</span> <span class="o">{</span> |
| <span class="kd">private</span> <span class="kd">static</span> <span class="kd">final</span> <span class="nc">Logger</span> <span class="no">LOG</span> <span class="o">=</span> <span class="nc">LoggerFactory</span><span class="o">.</span><span class="na">getLogger</span><span class="o">(</span><span class="nc">LookupWordTotalCountBolt</span><span class="o">.</span><span class="na">class</span><span class="o">);</span> |
| <span class="kd">private</span> <span class="kd">static</span> <span class="kd">final</span> <span class="nc">Random</span> <span class="no">RANDOM</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">Random</span><span class="o">();</span> |
| |
| <span class="kd">public</span> <span class="nf">LookupWordTotalCountBolt</span><span class="o">(</span><span class="nc">JedisPoolConfig</span> <span class="n">config</span><span class="o">)</span> <span class="o">{</span> |
| <span class="kd">super</span><span class="o">(</span><span class="n">config</span><span class="o">);</span> |
| <span class="o">}</span> |
| |
| <span class="kd">public</span> <span class="nf">LookupWordTotalCountBolt</span><span class="o">(</span><span class="nc">JedisClusterConfig</span> <span class="n">config</span><span class="o">)</span> <span class="o">{</span> |
| <span class="kd">super</span><span class="o">(</span><span class="n">config</span><span class="o">);</span> |
| <span class="o">}</span> |
| |
| <span class="nd">@Override</span> |
| <span class="kd">public</span> <span class="kt">void</span> <span class="nf">execute</span><span class="o">(</span><span class="nc">Tuple</span> <span class="n">input</span><span class="o">)</span> <span class="o">{</span> |
| <span class="nc">JedisCommands</span> <span class="n">jedisCommands</span> <span class="o">=</span> <span class="kc">null</span><span class="o">;</span> |
| <span class="k">try</span> <span class="o">{</span> |
| <span class="n">jedisCommands</span> <span class="o">=</span> <span class="n">getInstance</span><span class="o">();</span> |
| <span class="nc">String</span> <span class="n">wordName</span> <span class="o">=</span> <span class="n">input</span><span class="o">.</span><span class="na">getStringByField</span><span class="o">(</span><span class="s">"word"</span><span class="o">);</span> |
| <span class="nc">String</span> <span class="n">countStr</span> <span class="o">=</span> <span class="n">jedisCommands</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="n">wordName</span><span class="o">);</span> |
| <span class="k">if</span> <span class="o">(</span><span class="n">countStr</span> <span class="o">!=</span> <span class="kc">null</span><span class="o">)</span> <span class="o">{</span> |
| <span class="kt">int</span> <span class="n">count</span> <span class="o">=</span> <span class="nc">Integer</span><span class="o">.</span><span class="na">parseInt</span><span class="o">(</span><span class="n">countStr</span><span class="o">);</span> |
| <span class="k">this</span><span class="o">.</span><span class="na">collector</span><span class="o">.</span><span class="na">emit</span><span class="o">(</span><span class="k">new</span> <span class="nc">Values</span><span class="o">(</span><span class="n">wordName</span><span class="o">,</span> <span class="n">count</span><span class="o">));</span> |
| |
| <span class="c1">// print lookup result with low probability</span> |
| <span class="k">if</span><span class="o">(</span><span class="no">RANDOM</span><span class="o">.</span><span class="na">nextInt</span><span class="o">(</span><span class="mi">1000</span><span class="o">)</span> <span class="o">></span> <span class="mi">995</span><span class="o">)</span> <span class="o">{</span> |
| <span class="no">LOG</span><span class="o">.</span><span class="na">info</span><span class="o">(</span><span class="s">"Lookup result - word : "</span> <span class="o">+</span> <span class="n">wordName</span> <span class="o">+</span> <span class="s">" / count : "</span> <span class="o">+</span> <span class="n">count</span><span class="o">);</span> |
| <span class="o">}</span> |
| <span class="o">}</span> <span class="k">else</span> <span class="o">{</span> |
| <span class="c1">// skip</span> |
| <span class="no">LOG</span><span class="o">.</span><span class="na">warn</span><span class="o">(</span><span class="s">"Word not found in Redis - word : "</span> <span class="o">+</span> <span class="n">wordName</span><span class="o">);</span> |
| <span class="o">}</span> |
| <span class="o">}</span> <span class="k">finally</span> <span class="o">{</span> |
| <span class="k">if</span> <span class="o">(</span><span class="n">jedisCommands</span> <span class="o">!=</span> <span class="kc">null</span><span class="o">)</span> <span class="o">{</span> |
| <span class="n">returnInstance</span><span class="o">(</span><span class="n">jedisCommands</span><span class="o">);</span> |
| <span class="o">}</span> |
| <span class="k">this</span><span class="o">.</span><span class="na">collector</span><span class="o">.</span><span class="na">ack</span><span class="o">(</span><span class="n">input</span><span class="o">);</span> |
| <span class="o">}</span> |
| <span class="o">}</span> |
| |
| <span class="nd">@Override</span> |
| <span class="kd">public</span> <span class="kt">void</span> <span class="nf">declareOutputFields</span><span class="o">(</span><span class="nc">OutputFieldsDeclarer</span> <span class="n">declarer</span><span class="o">)</span> <span class="o">{</span> |
| <span class="c1">// wordName, count</span> |
| <span class="n">declarer</span><span class="o">.</span><span class="na">declare</span><span class="o">(</span><span class="k">new</span> <span class="nc">Fields</span><span class="o">(</span><span class="s">"wordName"</span><span class="o">,</span> <span class="s">"count"</span><span class="o">));</span> |
| <span class="o">}</span> |
| <span class="o">}</span> |
| </code></pre></div> |
| <h3 id="trident-state-usage">Trident State usage</h3> |
| |
| <ol> |
| <li><p>RedisState and RedisMapState, which provide Jedis interface just for single redis.</p></li> |
| <li><p>RedisClusterState and RedisClusterMapState, which provide JedisCluster interface, just for redis cluster.</p></li> |
| </ol> |
| |
| <p>RedisState</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"> <span class="nc">JedisPoolConfig</span> <span class="n">poolConfig</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">JedisPoolConfig</span><span class="o">.</span><span class="na">Builder</span><span class="o">()</span> |
| <span class="o">.</span><span class="na">setHost</span><span class="o">(</span><span class="n">redisHost</span><span class="o">).</span><span class="na">setPort</span><span class="o">(</span><span class="n">redisPort</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">build</span><span class="o">();</span> |
| <span class="nc">RedisStoreMapper</span> <span class="n">storeMapper</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">WordCountStoreMapper</span><span class="o">();</span> |
| <span class="nc">RedisLookupMapper</span> <span class="n">lookupMapper</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">WordCountLookupMapper</span><span class="o">();</span> |
| <span class="nc">RedisState</span><span class="o">.</span><span class="na">Factory</span> <span class="n">factory</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">RedisState</span><span class="o">.</span><span class="na">Factory</span><span class="o">(</span><span class="n">poolConfig</span><span class="o">);</span> |
| |
| <span class="nc">TridentTopology</span> <span class="n">topology</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">TridentTopology</span><span class="o">();</span> |
| <span class="nc">Stream</span> <span class="n">stream</span> <span class="o">=</span> <span class="n">topology</span><span class="o">.</span><span class="na">newStream</span><span class="o">(</span><span class="s">"spout1"</span><span class="o">,</span> <span class="n">spout</span><span class="o">);</span> |
| |
| <span class="n">stream</span><span class="o">.</span><span class="na">partitionPersist</span><span class="o">(</span><span class="n">factory</span><span class="o">,</span> |
| <span class="n">fields</span><span class="o">,</span> |
| <span class="k">new</span> <span class="nf">RedisStateUpdater</span><span class="o">(</span><span class="n">storeMapper</span><span class="o">).</span><span class="na">withExpire</span><span class="o">(</span><span class="mi">86400000</span><span class="o">),</span> |
| <span class="k">new</span> <span class="nf">Fields</span><span class="o">());</span> |
| |
| <span class="nc">TridentState</span> <span class="n">state</span> <span class="o">=</span> <span class="n">topology</span><span class="o">.</span><span class="na">newStaticState</span><span class="o">(</span><span class="n">factory</span><span class="o">);</span> |
| <span class="n">stream</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">stateQuery</span><span class="o">(</span><span class="n">state</span><span class="o">,</span> <span class="k">new</span> <span class="nc">Fields</span><span class="o">(</span><span class="s">"word"</span><span class="o">),</span> |
| <span class="k">new</span> <span class="nf">RedisStateQuerier</span><span class="o">(</span><span class="n">lookupMapper</span><span class="o">),</span> |
| <span class="k">new</span> <span class="nf">Fields</span><span class="o">(</span><span class="s">"columnName"</span><span class="o">,</span><span class="s">"columnValue"</span><span class="o">));</span> |
| </code></pre></div> |
| <p>RedisClusterState</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"> <span class="nc">Set</span><span class="o"><</span><span class="nc">InetSocketAddress</span><span class="o">></span> <span class="n">nodes</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">HashSet</span><span class="o"><</span><span class="nc">InetSocketAddress</span><span class="o">>();</span> |
| <span class="k">for</span> <span class="o">(</span><span class="nc">String</span> <span class="n">hostPort</span> <span class="o">:</span> <span class="n">redisHostPort</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">","</span><span class="o">))</span> <span class="o">{</span> |
| <span class="nc">String</span><span class="o">[]</span> <span class="n">host_port</span> <span class="o">=</span> <span class="n">hostPort</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">":"</span><span class="o">);</span> |
| <span class="n">nodes</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="k">new</span> <span class="nc">InetSocketAddress</span><span class="o">(</span><span class="n">host_port</span><span class="o">[</span><span class="mi">0</span><span class="o">],</span> <span class="nc">Integer</span><span class="o">.</span><span class="na">valueOf</span><span class="o">(</span><span class="n">host_port</span><span class="o">[</span><span class="mi">1</span><span class="o">])));</span> |
| <span class="o">}</span> |
| <span class="nc">JedisClusterConfig</span> <span class="n">clusterConfig</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">JedisClusterConfig</span><span class="o">.</span><span class="na">Builder</span><span class="o">().</span><span class="na">setNodes</span><span class="o">(</span><span class="n">nodes</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">build</span><span class="o">();</span> |
| <span class="nc">RedisStoreMapper</span> <span class="n">storeMapper</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">WordCountStoreMapper</span><span class="o">();</span> |
| <span class="nc">RedisLookupMapper</span> <span class="n">lookupMapper</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">WordCountLookupMapper</span><span class="o">();</span> |
| <span class="nc">RedisClusterState</span><span class="o">.</span><span class="na">Factory</span> <span class="n">factory</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">RedisClusterState</span><span class="o">.</span><span class="na">Factory</span><span class="o">(</span><span class="n">clusterConfig</span><span class="o">);</span> |
| |
| <span class="nc">TridentTopology</span> <span class="n">topology</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">TridentTopology</span><span class="o">();</span> |
| <span class="nc">Stream</span> <span class="n">stream</span> <span class="o">=</span> <span class="n">topology</span><span class="o">.</span><span class="na">newStream</span><span class="o">(</span><span class="s">"spout1"</span><span class="o">,</span> <span class="n">spout</span><span class="o">);</span> |
| |
| <span class="n">stream</span><span class="o">.</span><span class="na">partitionPersist</span><span class="o">(</span><span class="n">factory</span><span class="o">,</span> |
| <span class="n">fields</span><span class="o">,</span> |
| <span class="k">new</span> <span class="nf">RedisClusterStateUpdater</span><span class="o">(</span><span class="n">storeMapper</span><span class="o">).</span><span class="na">withExpire</span><span class="o">(</span><span class="mi">86400000</span><span class="o">),</span> |
| <span class="k">new</span> <span class="nf">Fields</span><span class="o">());</span> |
| |
| <span class="nc">TridentState</span> <span class="n">state</span> <span class="o">=</span> <span class="n">topology</span><span class="o">.</span><span class="na">newStaticState</span><span class="o">(</span><span class="n">factory</span><span class="o">);</span> |
| <span class="n">stream</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">stateQuery</span><span class="o">(</span><span class="n">state</span><span class="o">,</span> <span class="k">new</span> <span class="nc">Fields</span><span class="o">(</span><span class="s">"word"</span><span class="o">),</span> |
| <span class="k">new</span> <span class="nf">RedisClusterStateQuerier</span><span class="o">(</span><span class="n">lookupMapper</span><span class="o">),</span> |
| <span class="k">new</span> <span class="nf">Fields</span><span class="o">(</span><span class="s">"columnName"</span><span class="o">,</span><span class="s">"columnValue"</span><span class="o">));</span> |
| </code></pre></div></div> |
| |
| |
| </div> |
| </div> |
| </div> |
| <footer> |
| <div class="container-fluid"> |
| <div class="row"> |
| <div class="col-md-2"> |
| <div class="footer-widget"> |
| <h5>Meetups</h5> |
| <div class="footer-widget"> |
| <a class="acevent" data-format="wide" data-mode="dark"></a> |
| </div> |
| </div> |
| </div> |
| <div class="col-md-4"> |
| <div class="footer-widget"> |
| <h5>About Apache Storm</h5> |
| <p>Apache Storm integrates with any queueing system and any database system. Apache Storm's spout abstraction makes it easy to integrate a new queuing system. Likewise, integrating Apache Storm with database systems is easy.</p> |
| </div> |
| </div> |
| <div class="col-md-2"> |
| <div class="footer-widget"> |
| <h5>First Look</h5> |
| <ul class="footer-list"> |
| <li><a href="/releases/current/Rationale.html">Rationale</a></li> |
| <li><a href="/releases/current/Tutorial.html">Tutorial</a></li> |
| <li><a href="/releases/current/Setting-up-development-environment.html">Setting up development environment</a></li> |
| <li><a href="/releases/current/Creating-a-new-Storm-project.html">Creating a new Apache Storm project</a></li> |
| </ul> |
| </div> |
| </div> |
| <div class="col-md-2"> |
| <div class="footer-widget"> |
| <h5>Documentation</h5> |
| <ul class="footer-list"> |
| <li><a href="/releases/current/index.html">Index</a></li> |
| <li><a href="/releases/current/javadocs/index.html">Javadoc</a></li> |
| <li><a href="/releases/current/FAQ.html">FAQ</a></li> |
| </ul> |
| </div> |
| </div> |
| <div class="col-md-2"> |
| <div class="footer-widget"> |
| <h5>Misc</h5> |
| <ul class="footer-list"> |
| <li><a href="https://www.apache.org/licenses/">Licenses</a></li> |
| <li><a href="https://www.apache.org/security/">Security</a></li> |
| <li><a href="https://www.apache.org/foundation/thanks.html">Sponsors</a></li> |
| <li><a href="https://privacy.apache.org/policies/privacy-policy-public.html">Privacy</a></li> |
| </ul> |
| </div> |
| </div> |
| </div> |
| <hr/> |
| <div class="row"> |
| <div class="col-md-12"> |
| <p align="center">Copyright © 2024 <a href="https://www.apache.org">Apache Software Foundation</a> |
| . All Rights Reserved. |
| <br>Apache Storm, Apache, the Apache feather logo, and the Apache Storm project logos are trademarks of The Apache Software Foundation. |
| <br>All other marks mentioned may be trademarks or registered trademarks of their respective owners.</p> |
| </div> |
| </div> |
| </div> |
| </footer> |
| <!--Footer End--> |
| <!-- Matomo --> |
| <script> |
| var _paq = window._paq = window._paq || []; |
| /* tracker methods like "setCustomDimension" should be called before "trackPageView" */ |
| /* We explicitly disable cookie tracking to avoid privacy issues */ |
| _paq.push(['disableCookies']); |
| _paq.push(['trackPageView']); |
| _paq.push(['enableLinkTracking']); |
| (function() { |
| var u="//analytics.apache.org/"; |
| _paq.push(['setTrackerUrl', u+'matomo.php']); |
| _paq.push(['setSiteId', '38']); |
| var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0]; |
| g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s); |
| })(); |
| </script> |
| <!-- End Matomo Code --> |
| <script src="https://www.apachecon.com/event-images/snippet.js"></script> |
| <!-- Scroll to top --> |
| <span class="totop"><a href="#"><i class="fa fa-angle-up"></i></a></span> |
| |
| </body> |
| |
| </html> |
| |