| <!DOCTYPE html> |
| <html> |
| <head> |
| <meta charset="utf-8"> |
| <meta http-equiv="X-UA-Compatible" content="IE=edge"> |
| <meta name="viewport" content="width=device-width, initial-scale=1"> |
| |
| <link rel="shortcut icon" href="/favicon.ico" type="image/x-icon"> |
| <link rel="icon" href="/favicon.ico" type="image/x-icon"> |
| |
| <title>Storm Cassandra Integration</title> |
| |
| <!-- Bootstrap core CSS --> |
| <link href="/assets/css/bootstrap.min.css" rel="stylesheet"> |
| <!-- Bootstrap theme --> |
| <link href="/assets/css/bootstrap-theme.min.css" rel="stylesheet"> |
| |
| <!-- Custom styles for this template --> |
| <link rel="stylesheet" href="http://fortawesome.github.io/Font-Awesome/assets/font-awesome/css/font-awesome.css"> |
| <link href="/css/style.css" rel="stylesheet"> |
| <link href="/assets/css/owl.theme.css" rel="stylesheet"> |
| <link href="/assets/css/owl.carousel.css" rel="stylesheet"> |
| <script type="text/javascript" src="/assets/js/jquery.min.js"></script> |
| <script type="text/javascript" src="/assets/js/bootstrap.min.js"></script> |
| <script type="text/javascript" src="/assets/js/owl.carousel.min.js"></script> |
| <script type="text/javascript" src="/assets/js/storm.js"></script> |
| <!-- Just for debugging purposes. Don't actually copy these 2 lines! --> |
| <!--[if lt IE 9]><script src="../../assets/js/ie8-responsive-file-warning.js"></script><![endif]--> |
| |
| <!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries --> |
| <!--[if lt IE 9]> |
| <script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script> |
| <script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script> |
| <![endif]--> |
| </head> |
| |
| |
| <body> |
| <header> |
| <div class="container-fluid"> |
| <div class="row"> |
| <div class="col-md-5"> |
| <a href="/index.html"><img src="/images/logo.png" class="logo" /></a> |
| </div> |
| <div class="col-md-5"> |
| |
| <h1>Version: 2.3.0</h1> |
| |
| </div> |
| <div class="col-md-2"> |
| <a href="/downloads.html" class="btn-std btn-block btn-download">Download</a> |
| </div> |
| </div> |
| </div> |
| </header> |
| <!--Header End--> |
| <!--Navigation Begin--> |
| <div class="navbar" role="banner"> |
| <div class="container-fluid"> |
| <div class="navbar-header"> |
| <button class="navbar-toggle" type="button" data-toggle="collapse" data-target=".bs-navbar-collapse"> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| </button> |
| </div> |
| <nav class="collapse navbar-collapse bs-navbar-collapse" role="navigation"> |
| <ul class="nav navbar-nav"> |
| <li><a href="/index.html" id="home">Home</a></li> |
| <li><a href="/getting-help.html" id="getting-help">Getting Help</a></li> |
| <li><a href="/about/integrates.html" id="project-info">Project Information</a></li> |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown" id="documentation">Documentation <b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| |
| |
| <li><a href="/releases/2.4.0/index.html">2.4.0</a></li> |
| |
| |
| |
| <li><a href="/releases/2.3.0/index.html">2.3.0</a></li> |
| |
| |
| |
| <li><a href="/releases/2.2.1/index.html">2.2.1</a></li> |
| |
| |
| |
| <li><a href="/releases/2.2.0/index.html">2.2.0</a></li> |
| |
| |
| |
| <li><a href="/releases/2.1.1/index.html">2.1.1</a></li> |
| |
| |
| |
| <li><a href="/releases/2.1.0/index.html">2.1.0</a></li> |
| |
| |
| |
| <li><a href="/releases/2.0.0/index.html">2.0.0</a></li> |
| |
| |
| |
| <li><a href="/releases/1.2.4/index.html">1.2.4</a></li> |
| |
| |
| |
| <li><a href="/releases/1.2.3/index.html">1.2.3</a></li> |
| |
| |
| </ul> |
| </li> |
| <li><a href="/talksAndVideos.html">Talks and Slideshows</a></li> |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown" id="contribute">Community <b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="/contribute/Contributing-to-Storm.html">Contributing</a></li> |
| <li><a href="/contribute/People.html">People</a></li> |
| <li><a href="/contribute/BYLAWS.html">ByLaws</a></li> |
| <li><a href="/Powered-By.html">PoweredBy</a></li> |
| </ul> |
| </li> |
| <li><a href="/2022/03/25/storm240-released.html" id="news">News</a></li> |
| </ul> |
| </nav> |
| </div> |
| </div> |
| |
| |
| |
| <div class="container-fluid"> |
| <h1 class="page-title">Storm Cassandra Integration</h1> |
| <div class="row"> |
| <div class="col-md-12"> |
| <!-- Documentation --> |
| |
| <p class="post-meta"></p> |
| |
| <div class="documentation-content"><h3 id="bolt-api-implementation-for-apache-cassandra">Bolt API implementation for Apache Cassandra</h3> |
| |
| <p>This library provides core storm bolt on top of Apache Cassandra. |
| Provides simple DSL to map storm <em>Tuple</em> to Cassandra Query Language <em>Statement</em>.</p> |
| |
| <h3 id="configuration">Configuration</h3> |
| |
| <p>The following properties may be passed to storm configuration.</p> |
| |
| <table><thead> |
| <tr> |
| <th><strong>Property name</strong></th> |
| <th><strong>Description</strong></th> |
| <th><strong>Default</strong></th> |
| </tr> |
| </thead><tbody> |
| <tr> |
| <td><strong>cassandra.keyspace</strong></td> |
| <td>-</td> |
| <td></td> |
| </tr> |
| <tr> |
| <td><strong>cassandra.nodes</strong></td> |
| <td>-</td> |
| <td>{"localhost"}</td> |
| </tr> |
| <tr> |
| <td><strong>cassandra.username</strong></td> |
| <td>-</td> |
| <td>-</td> |
| </tr> |
| <tr> |
| <td><strong>cassandra.password</strong></td> |
| <td>-</td> |
| <td>-</td> |
| </tr> |
| <tr> |
| <td><strong>cassandra.port</strong></td> |
| <td>-</td> |
| <td>9092</td> |
| </tr> |
| <tr> |
| <td><strong>cassandra.output.consistencyLevel</strong></td> |
| <td>-</td> |
| <td>ONE</td> |
| </tr> |
| <tr> |
| <td><strong>cassandra.batch.size.rows</strong></td> |
| <td>-</td> |
| <td>100</td> |
| </tr> |
| <tr> |
| <td><strong>cassandra.retryPolicy</strong></td> |
| <td>-</td> |
| <td>DefaultRetryPolicy</td> |
| </tr> |
| <tr> |
| <td><strong>cassandra.reconnectionPolicy.baseDelayMs</strong></td> |
| <td>-</td> |
| <td>100 (ms)</td> |
| </tr> |
| <tr> |
| <td><strong>cassandra.reconnectionPolicy.maxDelayMs</strong></td> |
| <td>-</td> |
| <td>60000 (ms)</td> |
| </tr> |
| </tbody></table> |
| |
| <h3 id="cassandrawriterbolt">CassandraWriterBolt</h3> |
| |
| <h4 id="static-import">Static import</h4> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kn">import</span> <span class="nn">static</span> <span class="n">org</span><span class="o">.</span><span class="na">apache</span><span class="o">.</span><span class="na">storm</span><span class="o">.</span><span class="na">cassandra</span><span class="o">.</span><span class="na">DynamicStatementBuilder</span><span class="o">.*</span> |
| </code></pre></div> |
| <h4 id="insert-query-builder">Insert Query Builder</h4> |
| |
| <h5 id="insert-query-including-only-the-specified-tuple-fields">Insert query including only the specified tuple fields.</h5> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"> |
| <span class="k">new</span> <span class="nf">CassandraWriterBolt</span><span class="o">(</span> |
| <span class="n">async</span><span class="o">(</span> |
| <span class="n">simpleQuery</span><span class="o">(</span><span class="s">"INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">with</span><span class="o">(</span> |
| <span class="n">fields</span><span class="o">(</span><span class="s">"title"</span><span class="o">,</span> <span class="s">"year"</span><span class="o">,</span> <span class="s">"performer"</span><span class="o">,</span> <span class="s">"genre"</span><span class="o">,</span> <span class="s">"tracks"</span><span class="o">)</span> |
| <span class="o">)</span> |
| <span class="o">)</span> |
| <span class="o">);</span> |
| </code></pre></div> |
| <h5 id="insert-query-including-all-tuple-fields">Insert query including all tuple fields.</h5> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"> |
| <span class="k">new</span> <span class="nf">CassandraWriterBolt</span><span class="o">(</span> |
| <span class="n">async</span><span class="o">(</span> |
| <span class="n">simpleQuery</span><span class="o">(</span><span class="s">"INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">with</span><span class="o">(</span> <span class="n">all</span><span class="o">()</span> <span class="o">)</span> |
| <span class="o">)</span> |
| <span class="o">);</span> |
| </code></pre></div> |
| <h5 id="insert-multiple-queries-from-one-input-tuple">Insert multiple queries from one input tuple.</h5> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"> <span class="k">new</span> <span class="n">CassandraWriterBolt</span><span class="o">(</span> |
| <span class="n">async</span><span class="o">(</span> |
| <span class="n">simpleQuery</span><span class="o">(</span><span class="s">"INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);"</span><span class="o">).</span><span class="na">with</span><span class="o">(</span><span class="n">all</span><span class="o">())),</span> |
| <span class="n">simpleQuery</span><span class="o">(</span><span class="s">"INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);"</span><span class="o">).</span><span class="na">with</span><span class="o">(</span><span class="n">all</span><span class="o">()))</span> |
| <span class="o">)</span> |
| <span class="o">);</span> |
| </code></pre></div> |
| <h5 id="insert-query-using-querybuilder">Insert query using QueryBuilder</h5> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"> <span class="k">new</span> <span class="n">CassandraWriterBolt</span><span class="o">(</span> |
| <span class="n">async</span><span class="o">(</span> |
| <span class="n">simpleQuery</span><span class="o">(</span><span class="s">"INSERT INTO album (title,year,perfomer,genre,tracks) VALUES (?, ?, ?, ?, ?);"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">all</span><span class="o">()))</span> |
| <span class="o">)</span> |
| <span class="o">)</span> |
| </code></pre></div> |
| <h5 id="insert-query-with-static-bound-query">Insert query with static bound query</h5> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"> <span class="k">new</span> <span class="n">CassandraWriterBolt</span><span class="o">(</span> |
| <span class="n">async</span><span class="o">(</span> |
| <span class="n">boundQuery</span><span class="o">(</span><span class="s">"INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">bind</span><span class="o">(</span><span class="n">all</span><span class="o">());</span> |
| <span class="o">)</span> |
| <span class="o">);</span> |
| </code></pre></div> |
| <h5 id="insert-query-with-static-bound-query-using-named-setters-and-aliases">Insert query with static bound query using named setters and aliases</h5> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"> <span class="k">new</span> <span class="n">CassandraWriterBolt</span><span class="o">(</span> |
| <span class="n">async</span><span class="o">(</span> |
| <span class="n">boundQuery</span><span class="o">(</span><span class="s">"INSERT INTO album (title,year,performer,genre,tracks) VALUES (:ti, :ye, :pe, :ge, :tr);"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">bind</span><span class="o">(</span> |
| <span class="n">field</span><span class="o">(</span><span class="s">"ti"</span><span class="o">),</span><span class="n">as</span><span class="o">(</span><span class="s">"title"</span><span class="o">),</span> |
| <span class="n">field</span><span class="o">(</span><span class="s">"ye"</span><span class="o">).</span><span class="na">as</span><span class="o">(</span><span class="s">"year"</span><span class="o">)),</span> |
| <span class="n">field</span><span class="o">(</span><span class="s">"pe"</span><span class="o">).</span><span class="na">as</span><span class="o">(</span><span class="s">"performer"</span><span class="o">)),</span> |
| <span class="n">field</span><span class="o">(</span><span class="s">"ge"</span><span class="o">).</span><span class="na">as</span><span class="o">(</span><span class="s">"genre"</span><span class="o">)),</span> |
| <span class="n">field</span><span class="o">(</span><span class="s">"tr"</span><span class="o">).</span><span class="na">as</span><span class="o">(</span><span class="s">"tracks"</span><span class="o">))</span> |
| <span class="o">).</span><span class="na">byNamedSetters</span><span class="o">()</span> |
| <span class="o">)</span> |
| <span class="o">);</span> |
| </code></pre></div> |
| <h5 id="insert-query-with-bound-statement-load-from-storm-configuration">Insert query with bound statement load from storm configuration</h5> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"> <span class="k">new</span> <span class="n">CassandraWriterBolt</span><span class="o">(</span> |
| <span class="n">boundQuery</span><span class="o">(</span><span class="n">named</span><span class="o">(</span><span class="s">"insertIntoAlbum"</span><span class="o">))</span> |
| <span class="o">.</span><span class="na">bind</span><span class="o">(</span><span class="n">all</span><span class="o">());</span> |
| </code></pre></div> |
| <h5 id="insert-query-with-bound-statement-load-from-tuple-field">Insert query with bound statement load from tuple field</h5> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"> <span class="k">new</span> <span class="n">CassandraWriterBolt</span><span class="o">(</span> |
| <span class="n">boundQuery</span><span class="o">(</span><span class="n">namedByField</span><span class="o">(</span><span class="s">"cql"</span><span class="o">))</span> |
| <span class="o">.</span><span class="na">bind</span><span class="o">(</span><span class="n">all</span><span class="o">());</span> |
| </code></pre></div> |
| <h5 id="insert-query-with-batch-statement">Insert query with batch statement</h5> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"> <span class="c1">// Logged</span> |
| <span class="k">new</span> <span class="nf">CassandraWriterBolt</span><span class="o">(</span><span class="n">loggedBatch</span><span class="o">(</span> |
| <span class="n">simpleQuery</span><span class="o">(</span><span class="s">"INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);"</span><span class="o">).</span><span class="na">with</span><span class="o">(</span><span class="n">all</span><span class="o">())),</span> |
| <span class="n">simpleQuery</span><span class="o">(</span><span class="s">"INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);"</span><span class="o">).</span><span class="na">with</span><span class="o">(</span><span class="n">all</span><span class="o">()))</span> |
| <span class="o">)</span> |
| <span class="o">);</span> |
| <span class="c1">// UnLogged</span> |
| <span class="k">new</span> <span class="nf">CassandraWriterBolt</span><span class="o">(</span><span class="n">unLoggedBatch</span><span class="o">(</span> |
| <span class="n">simpleQuery</span><span class="o">(</span><span class="s">"INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);"</span><span class="o">).</span><span class="na">with</span><span class="o">(</span><span class="n">all</span><span class="o">())),</span> |
| <span class="n">simpleQuery</span><span class="o">(</span><span class="s">"INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);"</span><span class="o">).</span><span class="na">with</span><span class="o">(</span><span class="n">all</span><span class="o">()))</span> |
| <span class="o">)</span> |
| <span class="o">);</span> |
| </code></pre></div> |
| <h3 id="how-to-handle-query-execution-results">How to handle query execution results</h3> |
| |
| <p>The interface <em>ExecutionResultHandler</em> can be used to custom how an execution result should be handle.</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">ExecutionResultHandler</span> <span class="kd">extends</span> <span class="n">Serializable</span> <span class="o">{</span> |
| <span class="kt">void</span> <span class="nf">onQueryValidationException</span><span class="o">(</span><span class="n">QueryValidationException</span> <span class="n">e</span><span class="o">,</span> <span class="n">OutputCollector</span> <span class="n">collector</span><span class="o">,</span> <span class="n">Tuple</span> <span class="n">tuple</span><span class="o">);</span> |
| |
| <span class="kt">void</span> <span class="nf">onReadTimeoutException</span><span class="o">(</span><span class="n">ReadTimeoutException</span> <span class="n">e</span><span class="o">,</span> <span class="n">OutputCollector</span> <span class="n">collector</span><span class="o">,</span> <span class="n">Tuple</span> <span class="n">tuple</span><span class="o">);</span> |
| |
| <span class="kt">void</span> <span class="nf">onWriteTimeoutException</span><span class="o">(</span><span class="n">WriteTimeoutException</span> <span class="n">e</span><span class="o">,</span> <span class="n">OutputCollector</span> <span class="n">collector</span><span class="o">,</span> <span class="n">Tuple</span> <span class="n">tuple</span><span class="o">);</span> |
| |
| <span class="kt">void</span> <span class="nf">onUnavailableException</span><span class="o">(</span><span class="n">UnavailableException</span> <span class="n">e</span><span class="o">,</span> <span class="n">OutputCollector</span> <span class="n">collector</span><span class="o">,</span> <span class="n">Tuple</span> <span class="n">tuple</span><span class="o">);</span> |
| |
| <span class="kt">void</span> <span class="nf">onQuerySuccess</span><span class="o">(</span><span class="n">OutputCollector</span> <span class="n">collector</span><span class="o">,</span> <span class="n">Tuple</span> <span class="n">tuple</span><span class="o">);</span> |
| <span class="o">}</span> |
| </code></pre></div> |
| <p>By default, the CassandraBolt fails a tuple on all Cassandra Exception (see <a href="https://github.com/apache/storm/tree/master/external/storm-cassandra/blob/master/src/main/java/org/apache/storm/cassandra/BaseExecutionResultHandler.java">BaseExecutionResultHandler</a>) .</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"> <span class="k">new</span> <span class="n">CassandraWriterBolt</span><span class="o">(</span><span class="n">insertInto</span><span class="o">(</span><span class="s">"album"</span><span class="o">).</span><span class="na">values</span><span class="o">(</span><span class="n">with</span><span class="o">(</span><span class="n">all</span><span class="o">()).</span><span class="na">build</span><span class="o">())</span> |
| <span class="o">.</span><span class="na">withResultHandler</span><span class="o">(</span><span class="k">new</span> <span class="n">MyCustomResultHandler</span><span class="o">());</span> |
| </code></pre></div> |
| <h3 id="declare-output-fields">Declare Output fields</h3> |
| |
| <p>A CassandraBolt can declare output fields / stream output fields. |
| For instance, this may be used to remit a new tuple on error, or to chain queries.</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"> <span class="k">new</span> <span class="n">CassandraWriterBolt</span><span class="o">(</span><span class="n">insertInto</span><span class="o">(</span><span class="s">"album"</span><span class="o">).</span><span class="na">values</span><span class="o">(</span><span class="n">withFields</span><span class="o">(</span><span class="n">all</span><span class="o">()).</span><span class="na">build</span><span class="o">())</span> |
| <span class="o">.</span><span class="na">withResultHandler</span><span class="o">(</span><span class="k">new</span> <span class="n">EmitOnDriverExceptionResultHandler</span><span class="o">());</span> |
| <span class="o">.</span><span class="na">withStreamOutputFields</span><span class="o">(</span><span class="s">"stream_error"</span><span class="o">,</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"message"</span><span class="o">);</span> |
| |
| <span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">EmitOnDriverExceptionResultHandler</span> <span class="kd">extends</span> <span class="n">BaseExecutionResultHandler</span> <span class="o">{</span> |
| <span class="nd">@Override</span> |
| <span class="kd">protected</span> <span class="kt">void</span> <span class="nf">onDriverException</span><span class="o">(</span><span class="n">DriverException</span> <span class="n">e</span><span class="o">,</span> <span class="n">OutputCollector</span> <span class="n">collector</span><span class="o">,</span> <span class="n">Tuple</span> <span class="n">tuple</span><span class="o">)</span> <span class="o">{</span> |
| <span class="n">LOG</span><span class="o">.</span><span class="na">error</span><span class="o">(</span><span class="s">"An error occurred while executing cassandra statement"</span><span class="o">,</span> <span class="n">e</span><span class="o">);</span> |
| <span class="n">collector</span><span class="o">.</span><span class="na">emit</span><span class="o">(</span><span class="s">"stream_error"</span><span class="o">,</span> <span class="k">new</span> <span class="n">Values</span><span class="o">(</span><span class="n">e</span><span class="o">.</span><span class="na">getMessage</span><span class="o">()));</span> |
| <span class="n">collector</span><span class="o">.</span><span class="na">ack</span><span class="o">(</span><span class="n">tuple</span><span class="o">);</span> |
| <span class="o">}</span> |
| <span class="o">}</span> |
| </code></pre></div> |
| <h3 id="murmur3fieldgrouping">Murmur3FieldGrouping</h3> |
| |
| <p><a href="https://github.com/apache/storm/tree/master/external/storm-cassandra/blob/master/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java">Murmur3StreamGrouping</a> can be used to optimise cassandra writes. |
| The stream is partitioned among the bolt's tasks based on the specified row partition keys.</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">CassandraWriterBolt</span> <span class="n">bolt</span> <span class="o">=</span> <span class="k">new</span> <span class="n">CassandraWriterBolt</span><span class="o">(</span> |
| <span class="n">insertInto</span><span class="o">(</span><span class="s">"album"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">values</span><span class="o">(</span> |
| <span class="n">with</span><span class="o">(</span><span class="n">fields</span><span class="o">(</span><span class="s">"title"</span><span class="o">,</span> <span class="s">"year"</span><span class="o">,</span> <span class="s">"performer"</span><span class="o">,</span> <span class="s">"genre"</span><span class="o">,</span> <span class="s">"tracks"</span><span class="o">)</span> |
| <span class="o">).</span><span class="na">build</span><span class="o">());</span> |
| <span class="n">builder</span><span class="o">.</span><span class="na">setBolt</span><span class="o">(</span><span class="s">"BOLT_WRITER"</span><span class="o">,</span> <span class="n">bolt</span><span class="o">,</span> <span class="mi">4</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">customGrouping</span><span class="o">(</span><span class="s">"spout"</span><span class="o">,</span> <span class="k">new</span> <span class="n">Murmur3StreamGrouping</span><span class="o">(</span><span class="s">"title"</span><span class="o">))</span> |
| </code></pre></div> |
| <h3 id="trident-api-support">Trident API support</h3> |
| |
| <p>storm-cassandra support Trident <code>state</code> API for <code>inserting</code> data into Cassandra. </p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"> <span class="n">CassandraState</span><span class="o">.</span><span class="na">Options</span> <span class="n">options</span> <span class="o">=</span> <span class="k">new</span> <span class="n">CassandraState</span><span class="o">.</span><span class="na">Options</span><span class="o">(</span><span class="k">new</span> <span class="n">CassandraContext</span><span class="o">());</span> |
| <span class="n">CQLStatementTupleMapper</span> <span class="n">insertTemperatureValues</span> <span class="o">=</span> <span class="n">boundQuery</span><span class="o">(</span> |
| <span class="s">"INSERT INTO weather.temperature(weather_station_id, weather_station_name, event_time, temperature) VALUES(?, ?, ?, ?)"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">bind</span><span class="o">(</span><span class="n">with</span><span class="o">(</span><span class="n">field</span><span class="o">(</span><span class="s">"weather_station_id"</span><span class="o">),</span> <span class="n">field</span><span class="o">(</span><span class="s">"name"</span><span class="o">).</span><span class="na">as</span><span class="o">(</span><span class="s">"weather_station_name"</span><span class="o">),</span> <span class="n">field</span><span class="o">(</span><span class="s">"event_time"</span><span class="o">).</span><span class="na">now</span><span class="o">(),</span> <span class="n">field</span><span class="o">(</span><span class="s">"temperature"</span><span class="o">)));</span> |
| <span class="n">options</span><span class="o">.</span><span class="na">withCQLStatementTupleMapper</span><span class="o">(</span><span class="n">insertTemperatureValues</span><span class="o">);</span> |
| <span class="n">CassandraStateFactory</span> <span class="n">insertValuesStateFactory</span> <span class="o">=</span> <span class="k">new</span> <span class="n">CassandraStateFactory</span><span class="o">(</span><span class="n">options</span><span class="o">);</span> |
| <span class="n">TridentState</span> <span class="n">selectState</span> <span class="o">=</span> <span class="n">topology</span><span class="o">.</span><span class="na">newStaticState</span><span class="o">(</span><span class="n">selectWeatherStationStateFactory</span><span class="o">);</span> |
| <span class="n">stream</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">stateQuery</span><span class="o">(</span><span class="n">selectState</span><span class="o">,</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"weather_station_id"</span><span class="o">),</span> <span class="k">new</span> <span class="n">CassandraQuery</span><span class="o">(),</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"name"</span><span class="o">));</span> |
| <span class="n">stream</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">each</span><span class="o">(</span><span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"name"</span><span class="o">),</span> <span class="k">new</span> <span class="n">PrintFunction</span><span class="o">(),</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"name_x"</span><span class="o">));</span> |
| <span class="n">stream</span><span class="o">.</span><span class="na">partitionPersist</span><span class="o">(</span><span class="n">insertValuesStateFactory</span><span class="o">,</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"weather_station_id"</span><span class="o">,</span> <span class="s">"name"</span><span class="o">,</span> <span class="s">"event_time"</span><span class="o">,</span> <span class="s">"temperature"</span><span class="o">),</span> <span class="k">new</span> <span class="n">CassandraStateUpdater</span><span class="o">(),</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">());</span> |
| </code></pre></div> |
| <p>Below <code>state</code> API for <code>querying</code> data from Cassandra.</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"> <span class="n">CassandraState</span><span class="o">.</span><span class="na">Options</span> <span class="n">options</span> <span class="o">=</span> <span class="k">new</span> <span class="n">CassandraState</span><span class="o">.</span><span class="na">Options</span><span class="o">(</span><span class="k">new</span> <span class="n">CassandraContext</span><span class="o">());</span> |
| <span class="n">CQLStatementTupleMapper</span> <span class="n">insertTemperatureValues</span> <span class="o">=</span> <span class="n">boundQuery</span><span class="o">(</span><span class="s">"SELECT name FROM weather.station WHERE id = ?"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">bind</span><span class="o">(</span><span class="n">with</span><span class="o">(</span><span class="n">field</span><span class="o">(</span><span class="s">"weather_station_id"</span><span class="o">).</span><span class="na">as</span><span class="o">(</span><span class="s">"id"</span><span class="o">)));</span> |
| <span class="n">options</span><span class="o">.</span><span class="na">withCQLStatementTupleMapper</span><span class="o">(</span><span class="n">insertTemperatureValues</span><span class="o">);</span> |
| <span class="n">options</span><span class="o">.</span><span class="na">withCQLResultSetValuesMapper</span><span class="o">(</span><span class="k">new</span> <span class="n">TridentResultSetValuesMapper</span><span class="o">(</span><span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"name"</span><span class="o">)));</span> |
| <span class="n">CassandraStateFactory</span> <span class="n">selectWeatherStationStateFactory</span> <span class="o">=</span> <span class="k">new</span> <span class="n">CassandraStateFactory</span><span class="o">(</span><span class="n">options</span><span class="o">);</span> |
| <span class="n">CassandraStateFactory</span> <span class="n">selectWeatherStationStateFactory</span> <span class="o">=</span> <span class="n">getSelectWeatherStationStateFactory</span><span class="o">();</span> |
| <span class="n">TridentState</span> <span class="n">selectState</span> <span class="o">=</span> <span class="n">topology</span><span class="o">.</span><span class="na">newStaticState</span><span class="o">(</span><span class="n">selectWeatherStationStateFactory</span><span class="o">);</span> |
| <span class="n">stream</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">stateQuery</span><span class="o">(</span><span class="n">selectState</span><span class="o">,</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"weather_station_id"</span><span class="o">),</span> <span class="k">new</span> <span class="n">CassandraQuery</span><span class="o">(),</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"name"</span><span class="o">));</span> |
| </code></pre></div></div> |
| |
| |
| </div> |
| </div> |
| </div> |
| <footer> |
| <div class="container-fluid"> |
| <div class="row"> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>Meetups</h5> |
| <ul class="latest-news"> |
| |
| <li><a href="http://www.meetup.com/Apache-Storm-Apache-Kafka/">Apache Storm & Apache Kafka</a> <span class="small">(Sunnyvale, CA)</span></li> |
| |
| <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Apache Storm & Kafka Users</a> <span class="small">(Seattle, WA)</span></li> |
| |
| <li><a href="http://www.meetup.com/New-York-City-Storm-User-Group/">NYC Storm User Group</a> <span class="small">(New York, NY)</span></li> |
| |
| <li><a href="http://www.meetup.com/Bay-Area-Stream-Processing">Bay Area Stream Processing</a> <span class="small">(Emeryville, CA)</span></li> |
| |
| <li><a href="http://www.meetup.com/Boston-Storm-Users/">Boston Realtime Data</a> <span class="small">(Boston, MA)</span></li> |
| |
| <li><a href="http://www.meetup.com/storm-london">London Storm User Group</a> <span class="small">(London, UK)</span></li> |
| |
| <!-- <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Seatle, WA</a> <span class="small">(27 Jun 2015)</span></li> --> |
| </ul> |
| </div> |
| </div> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>About Apache Storm</h5> |
| <p>Apache Storm integrates with any queueing system and any database system. Apache Storm's spout abstraction makes it easy to integrate a new queuing system. Likewise, integrating Apache Storm with database systems is easy.</p> |
| </div> |
| </div> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>First Look</h5> |
| <ul class="footer-list"> |
| <li><a href="/releases/current/Rationale.html">Rationale</a></li> |
| <li><a href="/releases/current/Tutorial.html">Tutorial</a></li> |
| <li><a href="/releases/current/Setting-up-development-environment.html">Setting up development environment</a></li> |
| <li><a href="/releases/current/Creating-a-new-Storm-project.html">Creating a new Apache Storm project</a></li> |
| </ul> |
| </div> |
| </div> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>Documentation</h5> |
| <ul class="footer-list"> |
| <li><a href="/releases/current/index.html">Index</a></li> |
| <li><a href="/releases/current/javadocs/index.html">Javadoc</a></li> |
| <li><a href="/releases/current/FAQ.html">FAQ</a></li> |
| </ul> |
| </div> |
| </div> |
| </div> |
| <hr/> |
| <div class="row"> |
| <div class="col-md-12"> |
| <p align="center">Copyright © 2022 <a href="https://www.apache.org">Apache Software Foundation</a>. All Rights Reserved. |
| <br>Apache Storm, Apache, the Apache feather logo, and the Apache Storm project logos are trademarks of The Apache Software Foundation. |
| <br>All other marks mentioned may be trademarks or registered trademarks of their respective owners.</p> |
| </div> |
| </div> |
| </div> |
| </footer> |
| <!--Footer End--> |
| <!-- Scroll to top --> |
| <span class="totop"><a href="#"><i class="fa fa-angle-up"></i></a></span> |
| |
| </body> |
| |
| </html> |
| |