blob: 2eff1fbef7cb7a612d223ad739fb17ff7a27d6de [file] [log] [blame]
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="shortcut icon" href="/favicon.ico" type="image/x-icon">
<link rel="icon" href="/favicon.ico" type="image/x-icon">
<title>Storm Cassandra Integration</title>
<!-- Bootstrap core CSS -->
<link href="/assets/css/bootstrap.min.css" rel="stylesheet">
<!-- Bootstrap theme -->
<link href="/assets/css/bootstrap-theme.min.css" rel="stylesheet">
<!-- Custom styles for this template -->
<link rel="stylesheet" href="http://fortawesome.github.io/Font-Awesome/assets/font-awesome/css/font-awesome.css">
<link href="/css/style.css" rel="stylesheet">
<link href="/assets/css/owl.theme.css" rel="stylesheet">
<link href="/assets/css/owl.carousel.css" rel="stylesheet">
<script type="text/javascript" src="/assets/js/jquery.min.js"></script>
<script type="text/javascript" src="/assets/js/bootstrap.min.js"></script>
<script type="text/javascript" src="/assets/js/owl.carousel.min.js"></script>
<script type="text/javascript" src="/assets/js/storm.js"></script>
<!-- Just for debugging purposes. Don't actually copy these 2 lines! -->
<!--[if lt IE 9]><script src="../../assets/js/ie8-responsive-file-warning.js"></script><![endif]-->
<!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
<!--[if lt IE 9]>
<script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
<script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
<![endif]-->
</head>
<body>
<header>
<div class="container-fluid">
<div class="row">
<div class="col-md-5">
<a href="/index.html"><img src="/images/logo.png" class="logo" /></a>
</div>
<div class="col-md-5">
<h1>Version: 1.0.6</h1>
</div>
<div class="col-md-2">
<a href="/downloads.html" class="btn-std btn-block btn-download">Download</a>
</div>
</div>
</div>
</header>
<!--Header End-->
<!--Navigation Begin-->
<div class="navbar" role="banner">
<div class="container-fluid">
<div class="navbar-header">
<button class="navbar-toggle" type="button" data-toggle="collapse" data-target=".bs-navbar-collapse">
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
</div>
<nav class="collapse navbar-collapse bs-navbar-collapse" role="navigation">
<ul class="nav navbar-nav">
<li><a href="/index.html" id="home">Home</a></li>
<li><a href="/getting-help.html" id="getting-help">Getting Help</a></li>
<li><a href="/about/integrates.html" id="project-info">Project Information</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" id="documentation">Documentation <b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="/releases/2.0.0-SNAPSHOT/index.html">2.0.0-SNAPSHOT</a></li>
<li><a href="/releases/1.2.1/index.html">1.2.1</a></li>
<li><a href="/releases/1.1.2/index.html">1.1.2</a></li>
<li><a href="/releases/1.0.6/index.html">1.0.6</a></li>
</ul>
</li>
<li><a href="/talksAndVideos.html">Talks and Slideshows</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" id="contribute">Community <b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="/contribute/Contributing-to-Storm.html">Contributing</a></li>
<li><a href="/contribute/People.html">People</a></li>
<li><a href="/contribute/BYLAWS.html">ByLaws</a></li>
</ul>
</li>
<li><a href="/2018/02/19/storm121-released.html" id="news">News</a></li>
</ul>
</nav>
</div>
</div>
<div class="container-fluid">
<h1 class="page-title">Storm Cassandra Integration</h1>
<div class="row">
<div class="col-md-12">
<!-- Documentation -->
<p class="post-meta"></p>
<div class="documentation-content"><h3 id="bolt-api-implementation-for-apache-cassandra">Bolt API implementation for Apache Cassandra</h3>
<p>This library provides core storm bolt on top of Apache Cassandra.
Provides simple DSL to map storm <em>Tuple</em> to Cassandra Query Language <em>Statement</em>.</p>
<h3 id="configuration">Configuration</h3>
<p>The following properties may be passed to storm configuration.</p>
<table><thead>
<tr>
<th><strong>Property name</strong></th>
<th><strong>Description</strong></th>
<th><strong>Default</strong></th>
</tr>
</thead><tbody>
<tr>
<td><strong>cassandra.keyspace</strong></td>
<td>-</td>
<td></td>
</tr>
<tr>
<td><strong>cassandra.nodes</strong></td>
<td>-</td>
<td>{&quot;localhost&quot;}</td>
</tr>
<tr>
<td><strong>cassandra.username</strong></td>
<td>-</td>
<td>-</td>
</tr>
<tr>
<td><strong>cassandra.password</strong></td>
<td>-</td>
<td>-</td>
</tr>
<tr>
<td><strong>cassandra.port</strong></td>
<td>-</td>
<td>9092</td>
</tr>
<tr>
<td><strong>cassandra.output.consistencyLevel</strong></td>
<td>-</td>
<td>ONE</td>
</tr>
<tr>
<td><strong>cassandra.batch.size.rows</strong></td>
<td>-</td>
<td>100</td>
</tr>
<tr>
<td><strong>cassandra.retryPolicy</strong></td>
<td>-</td>
<td>DefaultRetryPolicy</td>
</tr>
<tr>
<td><strong>cassandra.reconnectionPolicy.baseDelayMs</strong></td>
<td>-</td>
<td>100 (ms)</td>
</tr>
<tr>
<td><strong>cassandra.reconnectionPolicy.maxDelayMs</strong></td>
<td>-</td>
<td>60000 (ms)</td>
</tr>
</tbody></table>
<h3 id="cassandrawriterbolt">CassandraWriterBolt</h3>
<h4 id="static-import">Static import</h4>
<div class="highlight"><pre><code class="language-java" data-lang="java">
<span class="kn">import</span> <span class="nn">static</span> <span class="n">org</span><span class="o">.</span><span class="na">apache</span><span class="o">.</span><span class="na">storm</span><span class="o">.</span><span class="na">cassandra</span><span class="o">.</span><span class="na">DynamicStatementBuilder</span><span class="o">.*</span>
</code></pre></div>
<h4 id="insert-query-builder">Insert Query Builder</h4>
<h5 id="insert-query-including-only-the-specified-tuple-fields">Insert query including only the specified tuple fields.</h5>
<div class="highlight"><pre><code class="language-java" data-lang="java">
<span class="k">new</span> <span class="nf">CassandraWriterBolt</span><span class="o">(</span>
<span class="n">async</span><span class="o">(</span>
<span class="n">simpleQuery</span><span class="o">(</span><span class="s">"INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);"</span><span class="o">)</span>
<span class="o">.</span><span class="na">with</span><span class="o">(</span>
<span class="n">fields</span><span class="o">(</span><span class="s">"title"</span><span class="o">,</span> <span class="s">"year"</span><span class="o">,</span> <span class="s">"performer"</span><span class="o">,</span> <span class="s">"genre"</span><span class="o">,</span> <span class="s">"tracks"</span><span class="o">)</span>
<span class="o">)</span>
<span class="o">)</span>
<span class="o">);</span>
</code></pre></div>
<h5 id="insert-query-including-all-tuple-fields">Insert query including all tuple fields.</h5>
<div class="highlight"><pre><code class="language-java" data-lang="java">
<span class="k">new</span> <span class="nf">CassandraWriterBolt</span><span class="o">(</span>
<span class="n">async</span><span class="o">(</span>
<span class="n">simpleQuery</span><span class="o">(</span><span class="s">"INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);"</span><span class="o">)</span>
<span class="o">.</span><span class="na">with</span><span class="o">(</span> <span class="n">all</span><span class="o">()</span> <span class="o">)</span>
<span class="o">)</span>
<span class="o">);</span>
</code></pre></div>
<h5 id="insert-multiple-queries-from-one-input-tuple">Insert multiple queries from one input tuple.</h5>
<div class="highlight"><pre><code class="language-java" data-lang="java">
<span class="k">new</span> <span class="nf">CassandraWriterBolt</span><span class="o">(</span>
<span class="n">async</span><span class="o">(</span>
<span class="n">simpleQuery</span><span class="o">(</span><span class="s">"INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);"</span><span class="o">).</span><span class="na">with</span><span class="o">(</span><span class="n">all</span><span class="o">())),</span>
<span class="n">simpleQuery</span><span class="o">(</span><span class="s">"INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);"</span><span class="o">).</span><span class="na">with</span><span class="o">(</span><span class="n">all</span><span class="o">()))</span>
<span class="o">)</span>
<span class="o">);</span>
</code></pre></div>
<h5 id="insert-query-using-querybuilder">Insert query using QueryBuilder</h5>
<div class="highlight"><pre><code class="language-java" data-lang="java">
<span class="k">new</span> <span class="nf">CassandraWriterBolt</span><span class="o">(</span>
<span class="n">async</span><span class="o">(</span>
<span class="n">simpleQuery</span><span class="o">(</span><span class="s">"INSERT INTO album (title,year,perfomer,genre,tracks) VALUES (?, ?, ?, ?, ?);"</span><span class="o">)</span>
<span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">all</span><span class="o">()))</span>
<span class="o">)</span>
<span class="o">)</span>
</code></pre></div>
<h5 id="insert-query-with-static-bound-query">Insert query with static bound query</h5>
<div class="highlight"><pre><code class="language-java" data-lang="java">
<span class="k">new</span> <span class="nf">CassandraWriterBolt</span><span class="o">(</span>
<span class="n">async</span><span class="o">(</span>
<span class="n">boundQuery</span><span class="o">(</span><span class="s">"INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);"</span><span class="o">)</span>
<span class="o">.</span><span class="na">bind</span><span class="o">(</span><span class="n">all</span><span class="o">());</span>
<span class="o">)</span>
<span class="o">);</span>
</code></pre></div>
<h5 id="insert-query-with-static-bound-query-using-named-setters-and-aliases">Insert query with static bound query using named setters and aliases</h5>
<div class="highlight"><pre><code class="language-java" data-lang="java">
<span class="k">new</span> <span class="nf">CassandraWriterBolt</span><span class="o">(</span>
<span class="n">async</span><span class="o">(</span>
<span class="n">boundQuery</span><span class="o">(</span><span class="s">"INSERT INTO album (title,year,performer,genre,tracks) VALUES (:ti, :ye, :pe, :ge, :tr);"</span><span class="o">)</span>
<span class="o">.</span><span class="na">bind</span><span class="o">(</span>
<span class="n">field</span><span class="o">(</span><span class="s">"ti"</span><span class="o">),</span><span class="n">as</span><span class="o">(</span><span class="s">"title"</span><span class="o">),</span>
<span class="n">field</span><span class="o">(</span><span class="s">"ye"</span><span class="o">).</span><span class="na">as</span><span class="o">(</span><span class="s">"year"</span><span class="o">)),</span>
<span class="n">field</span><span class="o">(</span><span class="s">"pe"</span><span class="o">).</span><span class="na">as</span><span class="o">(</span><span class="s">"performer"</span><span class="o">)),</span>
<span class="n">field</span><span class="o">(</span><span class="s">"ge"</span><span class="o">).</span><span class="na">as</span><span class="o">(</span><span class="s">"genre"</span><span class="o">)),</span>
<span class="n">field</span><span class="o">(</span><span class="s">"tr"</span><span class="o">).</span><span class="na">as</span><span class="o">(</span><span class="s">"tracks"</span><span class="o">))</span>
<span class="o">).</span><span class="na">byNamedSetters</span><span class="o">()</span>
<span class="o">)</span>
<span class="o">);</span>
</code></pre></div>
<h5 id="insert-query-with-bound-statement-load-from-storm-configuration">Insert query with bound statement load from storm configuration</h5>
<div class="highlight"><pre><code class="language-java" data-lang="java">
<span class="k">new</span> <span class="nf">CassandraWriterBolt</span><span class="o">(</span>
<span class="n">boundQuery</span><span class="o">(</span><span class="n">named</span><span class="o">(</span><span class="s">"insertIntoAlbum"</span><span class="o">))</span>
<span class="o">.</span><span class="na">bind</span><span class="o">(</span><span class="n">all</span><span class="o">());</span>
</code></pre></div>
<h5 id="insert-query-with-bound-statement-load-from-tuple-field">Insert query with bound statement load from tuple field</h5>
<div class="highlight"><pre><code class="language-java" data-lang="java">
<span class="k">new</span> <span class="nf">CassandraWriterBolt</span><span class="o">(</span>
<span class="n">boundQuery</span><span class="o">(</span><span class="n">namedByField</span><span class="o">(</span><span class="s">"cql"</span><span class="o">))</span>
<span class="o">.</span><span class="na">bind</span><span class="o">(</span><span class="n">all</span><span class="o">());</span>
</code></pre></div>
<h5 id="insert-query-with-batch-statement">Insert query with batch statement</h5>
<div class="highlight"><pre><code class="language-java" data-lang="java">
<span class="c1">// Logged</span>
<span class="k">new</span> <span class="nf">CassandraWriterBolt</span><span class="o">(</span><span class="n">loggedBatch</span><span class="o">(</span>
<span class="n">simpleQuery</span><span class="o">(</span><span class="s">"INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);"</span><span class="o">).</span><span class="na">with</span><span class="o">(</span><span class="n">all</span><span class="o">())),</span>
<span class="n">simpleQuery</span><span class="o">(</span><span class="s">"INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);"</span><span class="o">).</span><span class="na">with</span><span class="o">(</span><span class="n">all</span><span class="o">()))</span>
<span class="o">)</span>
<span class="o">);</span>
<span class="c1">// UnLogged</span>
<span class="k">new</span> <span class="nf">CassandraWriterBolt</span><span class="o">(</span><span class="n">unLoggedBatch</span><span class="o">(</span>
<span class="n">simpleQuery</span><span class="o">(</span><span class="s">"INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);"</span><span class="o">).</span><span class="na">with</span><span class="o">(</span><span class="n">all</span><span class="o">())),</span>
<span class="n">simpleQuery</span><span class="o">(</span><span class="s">"INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);"</span><span class="o">).</span><span class="na">with</span><span class="o">(</span><span class="n">all</span><span class="o">()))</span>
<span class="o">)</span>
<span class="o">);</span>
</code></pre></div>
<h3 id="how-to-handle-query-execution-results">How to handle query execution results</h3>
<p>The interface <em>ExecutionResultHandler</em> can be used to custom how an execution result should be handle.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">ExecutionResultHandler</span> <span class="kd">extends</span> <span class="n">Serializable</span> <span class="o">{</span>
<span class="kt">void</span> <span class="nf">onQueryValidationException</span><span class="o">(</span><span class="n">QueryValidationException</span> <span class="n">e</span><span class="o">,</span> <span class="n">OutputCollector</span> <span class="n">collector</span><span class="o">,</span> <span class="n">Tuple</span> <span class="n">tuple</span><span class="o">);</span>
<span class="kt">void</span> <span class="nf">onReadTimeoutException</span><span class="o">(</span><span class="n">ReadTimeoutException</span> <span class="n">e</span><span class="o">,</span> <span class="n">OutputCollector</span> <span class="n">collector</span><span class="o">,</span> <span class="n">Tuple</span> <span class="n">tuple</span><span class="o">);</span>
<span class="kt">void</span> <span class="nf">onWriteTimeoutException</span><span class="o">(</span><span class="n">WriteTimeoutException</span> <span class="n">e</span><span class="o">,</span> <span class="n">OutputCollector</span> <span class="n">collector</span><span class="o">,</span> <span class="n">Tuple</span> <span class="n">tuple</span><span class="o">);</span>
<span class="kt">void</span> <span class="nf">onUnavailableException</span><span class="o">(</span><span class="n">UnavailableException</span> <span class="n">e</span><span class="o">,</span> <span class="n">OutputCollector</span> <span class="n">collector</span><span class="o">,</span> <span class="n">Tuple</span> <span class="n">tuple</span><span class="o">);</span>
<span class="kt">void</span> <span class="nf">onQuerySuccess</span><span class="o">(</span><span class="n">OutputCollector</span> <span class="n">collector</span><span class="o">,</span> <span class="n">Tuple</span> <span class="n">tuple</span><span class="o">);</span>
<span class="o">}</span>
</code></pre></div>
<p>By default, the CassandraBolt fails a tuple on all Cassandra Exception (see <a href="https://github.com/apache/storm/tree/master/external/storm-cassandra/blob/master/src/main/java/org/apache/storm/cassandra/BaseExecutionResultHandler.java">BaseExecutionResultHandler</a>) .</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"> <span class="k">new</span> <span class="n">CassandraWriterBolt</span><span class="o">(</span><span class="n">insertInto</span><span class="o">(</span><span class="s">"album"</span><span class="o">).</span><span class="na">values</span><span class="o">(</span><span class="n">with</span><span class="o">(</span><span class="n">all</span><span class="o">()).</span><span class="na">build</span><span class="o">())</span>
<span class="o">.</span><span class="na">withResultHandler</span><span class="o">(</span><span class="k">new</span> <span class="n">MyCustomResultHandler</span><span class="o">());</span>
</code></pre></div>
<h3 id="declare-output-fields">Declare Output fields</h3>
<p>A CassandraBolt can declare output fields / stream output fields.
For instance, this may be used to remit a new tuple on error, or to chain queries.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"> <span class="k">new</span> <span class="n">CassandraWriterBolt</span><span class="o">(</span><span class="n">insertInto</span><span class="o">(</span><span class="s">"album"</span><span class="o">).</span><span class="na">values</span><span class="o">(</span><span class="n">withFields</span><span class="o">(</span><span class="n">all</span><span class="o">()).</span><span class="na">build</span><span class="o">())</span>
<span class="o">.</span><span class="na">withResultHandler</span><span class="o">(</span><span class="k">new</span> <span class="n">EmitOnDriverExceptionResultHandler</span><span class="o">());</span>
<span class="o">.</span><span class="na">withStreamOutputFields</span><span class="o">(</span><span class="s">"stream_error"</span><span class="o">,</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"message"</span><span class="o">);</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">EmitOnDriverExceptionResultHandler</span> <span class="kd">extends</span> <span class="n">BaseExecutionResultHandler</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">protected</span> <span class="kt">void</span> <span class="nf">onDriverException</span><span class="o">(</span><span class="n">DriverException</span> <span class="n">e</span><span class="o">,</span> <span class="n">OutputCollector</span> <span class="n">collector</span><span class="o">,</span> <span class="n">Tuple</span> <span class="n">tuple</span><span class="o">)</span> <span class="o">{</span>
<span class="n">LOG</span><span class="o">.</span><span class="na">error</span><span class="o">(</span><span class="s">"An error occurred while executing cassandra statement"</span><span class="o">,</span> <span class="n">e</span><span class="o">);</span>
<span class="n">collector</span><span class="o">.</span><span class="na">emit</span><span class="o">(</span><span class="s">"stream_error"</span><span class="o">,</span> <span class="k">new</span> <span class="n">Values</span><span class="o">(</span><span class="n">e</span><span class="o">.</span><span class="na">getMessage</span><span class="o">()));</span>
<span class="n">collector</span><span class="o">.</span><span class="na">ack</span><span class="o">(</span><span class="n">tuple</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span>
</code></pre></div>
<h3 id="murmur3fieldgrouping">Murmur3FieldGrouping</h3>
<p><a href="https://github.com/apache/storm/tree/master/external/storm-cassandra/blob/master/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java">Murmur3StreamGrouping</a> can be used to optimise cassandra writes.
The stream is partitioned among the bolt&#39;s tasks based on the specified row partition keys.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">CassandraWriterBolt</span> <span class="n">bolt</span> <span class="o">=</span> <span class="k">new</span> <span class="n">CassandraWriterBolt</span><span class="o">(</span>
<span class="n">insertInto</span><span class="o">(</span><span class="s">"album"</span><span class="o">)</span>
<span class="o">.</span><span class="na">values</span><span class="o">(</span>
<span class="n">with</span><span class="o">(</span><span class="n">fields</span><span class="o">(</span><span class="s">"title"</span><span class="o">,</span> <span class="s">"year"</span><span class="o">,</span> <span class="s">"performer"</span><span class="o">,</span> <span class="s">"genre"</span><span class="o">,</span> <span class="s">"tracks"</span><span class="o">)</span>
<span class="o">).</span><span class="na">build</span><span class="o">());</span>
<span class="n">builder</span><span class="o">.</span><span class="na">setBolt</span><span class="o">(</span><span class="s">"BOLT_WRITER"</span><span class="o">,</span> <span class="n">bolt</span><span class="o">,</span> <span class="mi">4</span><span class="o">)</span>
<span class="o">.</span><span class="na">customGrouping</span><span class="o">(</span><span class="s">"spout"</span><span class="o">,</span> <span class="k">new</span> <span class="n">Murmur3StreamGrouping</span><span class="o">(</span><span class="s">"title"</span><span class="o">))</span>
</code></pre></div>
<h3 id="trident-api-support">Trident API support</h3>
<p>storm-cassandra support Trident <code>state</code> API for <code>inserting</code> data into Cassandra.
<code>java
CassandraState.Options options = new CassandraState.Options(new CassandraContext());
CQLStatementTupleMapper insertTemperatureValues = boundQuery(
&quot;INSERT INTO weather.temperature(weather_station_id, weather_station_name, event_time, temperature) VALUES(?, ?, ?, ?)&quot;)
.bind(with(field(&quot;weather_station_id&quot;), field(&quot;name&quot;).as(&quot;weather_station_name&quot;), field(&quot;event_time&quot;).now(), field(&quot;temperature&quot;)));
options.withCQLStatementTupleMapper(insertTemperatureValues);
CassandraStateFactory insertValuesStateFactory = new CassandraStateFactory(options);
TridentState selectState = topology.newStaticState(selectWeatherStationStateFactory);
stream = stream.stateQuery(selectState, new Fields(&quot;weather_station_id&quot;), new CassandraQuery(), new Fields(&quot;name&quot;));
stream = stream.each(new Fields(&quot;name&quot;), new PrintFunction(), new Fields(&quot;name_x&quot;));
stream.partitionPersist(insertValuesStateFactory, new Fields(&quot;weather_station_id&quot;, &quot;name&quot;, &quot;event_time&quot;, &quot;temperature&quot;), new CassandraStateUpdater(), new Fields());
</code></p>
<p>Below <code>state</code> API for <code>querying</code> data from Cassandra.
<code>java
CassandraState.Options options = new CassandraState.Options(new CassandraContext());
CQLStatementTupleMapper insertTemperatureValues = boundQuery(&quot;SELECT name FROM weather.station WHERE id = ?&quot;)
.bind(with(field(&quot;weather_station_id&quot;).as(&quot;id&quot;)));
options.withCQLStatementTupleMapper(insertTemperatureValues);
options.withCQLResultSetValuesMapper(new TridentResultSetValuesMapper(new Fields(&quot;name&quot;)));
CassandraStateFactory selectWeatherStationStateFactory = new CassandraStateFactory(options);
CassandraStateFactory selectWeatherStationStateFactory = getSelectWeatherStationStateFactory();
TridentState selectState = topology.newStaticState(selectWeatherStationStateFactory);
stream = stream.stateQuery(selectState, new Fields(&quot;weather_station_id&quot;), new CassandraQuery(), new Fields(&quot;name&quot;));
</code></p>
<h2 id="license">License</h2>
<p>Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
&quot;License&quot;); you may not use this file except in compliance
with the License. You may obtain a copy of the License at</p>
<p><a href="http://www.apache.org/licenses/LICENSE-2.0">http://www.apache.org/licenses/LICENSE-2.0</a></p>
<p>Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
&quot;AS IS&quot; BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.</p>
<h2 id="committer-sponsors">Committer Sponsors</h2>
<ul>
<li>Sriharha Chintalapani (<a href="mailto:sriharsha@apache.org">sriharsha@apache.org</a>)</li>
<li>P. Taylor Goetz (<a href="mailto:ptgoetz@apache.org">ptgoetz@apache.org</a>)</li>
</ul>
</div>
</div>
</div>
</div>
<footer>
<div class="container-fluid">
<div class="row">
<div class="col-md-3">
<div class="footer-widget">
<h5>Meetups</h5>
<ul class="latest-news">
<li><a href="http://www.meetup.com/Apache-Storm-Apache-Kafka/">Apache Storm & Apache Kafka</a> <span class="small">(Sunnyvale, CA)</span></li>
<li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Apache Storm & Kafka Users</a> <span class="small">(Seattle, WA)</span></li>
<li><a href="http://www.meetup.com/New-York-City-Storm-User-Group/">NYC Storm User Group</a> <span class="small">(New York, NY)</span></li>
<li><a href="http://www.meetup.com/Bay-Area-Stream-Processing">Bay Area Stream Processing</a> <span class="small">(Emeryville, CA)</span></li>
<li><a href="http://www.meetup.com/Boston-Storm-Users/">Boston Realtime Data</a> <span class="small">(Boston, MA)</span></li>
<li><a href="http://www.meetup.com/storm-london">London Storm User Group</a> <span class="small">(London, UK)</span></li>
<!-- <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Seatle, WA</a> <span class="small">(27 Jun 2015)</span></li> -->
</ul>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>About Storm</h5>
<p>Storm integrates with any queueing system and any database system. Storm's spout abstraction makes it easy to integrate a new queuing system. Likewise, integrating Storm with database systems is easy.</p>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>First Look</h5>
<ul class="footer-list">
<li><a href="/releases/current/Rationale.html">Rationale</a></li>
<li><a href="/releases/current/Tutorial.html">Tutorial</a></li>
<li><a href="/releases/current/Setting-up-development-environment.html">Setting up development environment</a></li>
<li><a href="/releases/current/Creating-a-new-Storm-project.html">Creating a new Storm project</a></li>
</ul>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>Documentation</h5>
<ul class="footer-list">
<li><a href="/releases/current/index.html">Index</a></li>
<li><a href="/releases/current/javadocs/index.html">Javadoc</a></li>
<li><a href="/releases/current/FAQ.html">FAQ</a></li>
</ul>
</div>
</div>
</div>
<hr/>
<div class="row">
<div class="col-md-12">
<p align="center">Copyright © 2015 <a href="http://www.apache.org">Apache Software Foundation</a>. All Rights Reserved.
<br>Apache Storm, Apache, the Apache feather logo, and the Apache Storm project logos are trademarks of The Apache Software Foundation.
<br>All other marks mentioned may be trademarks or registered trademarks of their respective owners.</p>
</div>
</div>
</div>
</footer>
<!--Footer End-->
<!-- Scroll to top -->
<span class="totop"><a href="#"><i class="fa fa-angle-up"></i></a></span>
</body>
</html>