| <!DOCTYPE html> |
| <html> |
| <head> |
| <meta charset="utf-8"> |
| <meta http-equiv="X-UA-Compatible" content="IE=edge"> |
| <meta name="viewport" content="width=device-width, initial-scale=1"> |
| |
| <link rel="shortcut icon" href="/favicon.ico" type="image/x-icon"> |
| <link rel="icon" href="/favicon.ico" type="image/x-icon"> |
| |
| <title>Storm Kinesis</title> |
| |
| <!-- Bootstrap core CSS --> |
| <link href="/assets/css/bootstrap.min.css" rel="stylesheet"> |
| <!-- Bootstrap theme --> |
| <link href="/assets/css/bootstrap-theme.min.css" rel="stylesheet"> |
| |
| <!-- Custom styles for this template --> |
| <link rel="stylesheet" href="http://fortawesome.github.io/Font-Awesome/assets/font-awesome/css/font-awesome.css"> |
| <link href="/css/style.css" rel="stylesheet"> |
| <link href="/assets/css/owl.theme.css" rel="stylesheet"> |
| <link href="/assets/css/owl.carousel.css" rel="stylesheet"> |
| <script type="text/javascript" src="/assets/js/jquery.min.js"></script> |
| <script type="text/javascript" src="/assets/js/bootstrap.min.js"></script> |
| <script type="text/javascript" src="/assets/js/owl.carousel.min.js"></script> |
| <script type="text/javascript" src="/assets/js/storm.js"></script> |
| <!-- Just for debugging purposes. Don't actually copy these 2 lines! --> |
| <!--[if lt IE 9]><script src="../../assets/js/ie8-responsive-file-warning.js"></script><![endif]--> |
| |
| <!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries --> |
| <!--[if lt IE 9]> |
| <script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script> |
| <script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script> |
| <![endif]--> |
| </head> |
| |
| |
| <body> |
| <header> |
| <div class="container-fluid"> |
| <div class="row"> |
| <div class="col-md-5"> |
| <a href="/index.html"><img src="/images/logo.png" class="logo" /></a> |
| </div> |
| <div class="col-md-5"> |
| |
| <h1>Version: 2.0.0</h1> |
| |
| </div> |
| <div class="col-md-2"> |
| <a href="/downloads.html" class="btn-std btn-block btn-download">Download</a> |
| </div> |
| </div> |
| </div> |
| </header> |
| <!--Header End--> |
| <!--Navigation Begin--> |
| <div class="navbar" role="banner"> |
| <div class="container-fluid"> |
| <div class="navbar-header"> |
| <button class="navbar-toggle" type="button" data-toggle="collapse" data-target=".bs-navbar-collapse"> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| </button> |
| </div> |
| <nav class="collapse navbar-collapse bs-navbar-collapse" role="navigation"> |
| <ul class="nav navbar-nav"> |
| <li><a href="/index.html" id="home">Home</a></li> |
| <li><a href="/getting-help.html" id="getting-help">Getting Help</a></li> |
| <li><a href="/about/integrates.html" id="project-info">Project Information</a></li> |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown" id="documentation">Documentation <b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| |
| |
| <li><a href="/releases/2.0.0/index.html">2.0.0</a></li> |
| |
| |
| |
| <li><a href="/releases/1.2.3/index.html">1.2.3</a></li> |
| |
| |
| </ul> |
| </li> |
| <li><a href="/talksAndVideos.html">Talks and Slideshows</a></li> |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown" id="contribute">Community <b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="/contribute/Contributing-to-Storm.html">Contributing</a></li> |
| <li><a href="/contribute/People.html">People</a></li> |
| <li><a href="/contribute/BYLAWS.html">ByLaws</a></li> |
| </ul> |
| </li> |
| <li><a href="/2019/07/18/storm123-released.html" id="news">News</a></li> |
| </ul> |
| </nav> |
| </div> |
| </div> |
| |
| |
| |
| <div class="container-fluid"> |
| <h1 class="page-title">Storm Kinesis</h1> |
| <div class="row"> |
| <div class="col-md-12"> |
| <!-- Documentation --> |
| |
| <p class="post-meta"></p> |
| |
| <div class="documentation-content"><h1 id="storm-kinesis-spout">Storm Kinesis Spout</h1> |
| |
| <p>Provides core storm spout for consuming data from a stream in Amazon Kinesis Streams. It stores the sequence numbers that can be committed in zookeeper and |
| starts consuming records after that sequence number on restart by default. Below is the code sample to create a sample topology that uses the spout. Each |
| object used in configuring the spout is explained below. Ideally, the number of spout tasks should be equal to number of shards in kinesis. However each task |
| can read from more than one shard.</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">KinesisSpoutTopology</span> <span class="o">{</span> |
| <span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span> <span class="o">(</span><span class="n">String</span> <span class="n">args</span><span class="o">[])</span> <span class="kd">throws</span> <span class="n">InvalidTopologyException</span><span class="o">,</span> <span class="n">AuthorizationException</span><span class="o">,</span> <span class="n">AlreadyAliveException</span> <span class="o">{</span> |
| <span class="n">String</span> <span class="n">topologyName</span> <span class="o">=</span> <span class="n">args</span><span class="o">[</span><span class="mi">0</span><span class="o">];</span> |
| <span class="n">RecordToTupleMapper</span> <span class="n">recordToTupleMapper</span> <span class="o">=</span> <span class="k">new</span> <span class="n">TestRecordToTupleMapper</span><span class="o">();</span> |
| <span class="n">KinesisConnectionInfo</span> <span class="n">kinesisConnectionInfo</span> <span class="o">=</span> <span class="k">new</span> <span class="n">KinesisConnectionInfo</span><span class="o">(</span><span class="k">new</span> <span class="n">CredentialsProviderChain</span><span class="o">(),</span> <span class="k">new</span> <span class="n">ClientConfiguration</span><span class="o">(),</span> <span class="n">Regions</span><span class="o">.</span><span class="na">US_WEST_2</span><span class="o">,</span> |
| <span class="mi">1000</span><span class="o">);</span> |
| <span class="n">ZkInfo</span> <span class="n">zkInfo</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ZkInfo</span><span class="o">(</span><span class="s">"localhost:2181"</span><span class="o">,</span> <span class="s">"/kinesisOffsets"</span><span class="o">,</span> <span class="mi">20000</span><span class="o">,</span> <span class="mi">15000</span><span class="o">,</span> <span class="mi">10000L</span><span class="o">,</span> <span class="mi">3</span><span class="o">,</span> <span class="mi">2000</span><span class="o">);</span> |
| <span class="n">KinesisConfig</span> <span class="n">kinesisConfig</span> <span class="o">=</span> <span class="k">new</span> <span class="n">KinesisConfig</span><span class="o">(</span><span class="n">args</span><span class="o">[</span><span class="mi">1</span><span class="o">],</span> <span class="n">ShardIteratorType</span><span class="o">.</span><span class="na">TRIM_HORIZON</span><span class="o">,</span> |
| <span class="n">recordToTupleMapper</span><span class="o">,</span> <span class="k">new</span> <span class="n">Date</span><span class="o">(),</span> <span class="k">new</span> <span class="n">ExponentialBackoffRetrier</span><span class="o">(),</span> <span class="n">zkInfo</span><span class="o">,</span> <span class="n">kinesisConnectionInfo</span><span class="o">,</span> <span class="mi">10000L</span><span class="o">);</span> |
| <span class="n">KinesisSpout</span> <span class="n">kinesisSpout</span> <span class="o">=</span> <span class="k">new</span> <span class="n">KinesisSpout</span><span class="o">(</span><span class="n">kinesisConfig</span><span class="o">);</span> |
| <span class="n">TopologyBuilder</span> <span class="n">topologyBuilder</span> <span class="o">=</span> <span class="k">new</span> <span class="n">TopologyBuilder</span><span class="o">();</span> |
| <span class="n">topologyBuilder</span><span class="o">.</span><span class="na">setSpout</span><span class="o">(</span><span class="s">"spout"</span><span class="o">,</span> <span class="n">kinesisSpout</span><span class="o">,</span> <span class="mi">3</span><span class="o">);</span> |
| <span class="n">topologyBuilder</span><span class="o">.</span><span class="na">setBolt</span><span class="o">(</span><span class="s">"bolt"</span><span class="o">,</span> <span class="k">new</span> <span class="n">KinesisBoltTest</span><span class="o">(),</span> <span class="mi">1</span><span class="o">).</span><span class="na">shuffleGrouping</span><span class="o">(</span><span class="s">"spout"</span><span class="o">);</span> |
| <span class="n">Config</span> <span class="n">topologyConfig</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Config</span><span class="o">();</span> |
| <span class="n">topologyConfig</span><span class="o">.</span><span class="na">setDebug</span><span class="o">(</span><span class="kc">true</span><span class="o">);</span> |
| <span class="n">topologyConfig</span><span class="o">.</span><span class="na">setNumWorkers</span><span class="o">(</span><span class="mi">3</span><span class="o">);</span> |
| <span class="n">StormSubmitter</span><span class="o">.</span><span class="na">submitTopology</span><span class="o">(</span><span class="n">topologyName</span><span class="o">,</span> <span class="n">topologyConfig</span><span class="o">,</span> <span class="n">topologyBuilder</span><span class="o">.</span><span class="na">createTopology</span><span class="o">());</span> |
| <span class="o">}</span> |
| <span class="o">}</span> |
| </code></pre></div> |
| <p>As you can see above the spout takes an object of KinesisConfig in its constructor. The constructor of KinesisConfig takes 8 objects as explained below.</p> |
| |
| <h4 id="string-streamname"><code>String</code> streamName</h4> |
| |
| <p>name of kinesis stream to consume data from</p> |
| |
| <h4 id="sharditeratortype-sharditeratortype"><code>ShardIteratorType</code> shardIteratorType</h4> |
| |
| <p>3 types are supported - TRIM_HORIZON(beginning of shard), LATEST and AT_TIMESTAMP. By default this argument is ignored if state for shards |
| is found in zookeeper. Hence they will apply the first time a topology is started. If you want to use any of these in subsequent runs of the topology, you |
| will need to clear the state of zookeeper node used for storing sequence numbers</p> |
| |
| <h4 id="recordtotuplemapper-recordtotuplemapper"><code>RecordToTupleMapper</code> recordToTupleMapper</h4> |
| |
| <p>an implementation of <code>RecordToTupleMapper</code> interface that spout will call to convert a kinesis record to a storm tuple. It has two methods. getOutputFields |
| tells the spout the fields that will be present in the tuple emitted from the getTuple method. If getTuple returns null, the record will be acked</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"> <span class="n">Fields</span> <span class="nf">getOutputFields</span> <span class="o">();</span> |
| <span class="n">List</span><span class="o"><</span><span class="n">Object</span><span class="o">></span> <span class="nf">getTuple</span> <span class="o">(</span><span class="n">Record</span> <span class="n">record</span><span class="o">);</span> |
| </code></pre></div> |
| <h4 id="date-timestamp"><code>Date</code> timestamp</h4> |
| |
| <p>used in conjunction with the AT_TIMESTAMP shardIteratorType argument. This will make the spout fetch records from kinesis starting at that time or later. The |
| time used by kinesis is the server side time associated to the record by kinesis</p> |
| |
| <h4 id="failedmessageretryhadnler-failedmessageretryhandler"><code>FailedMessageRetryHadnler</code> failedMessageRetryHandler</h4> |
| |
| <p>an implementation of the <code>FailedMessageRetryHandler</code> interface. By default this module provides an implementation that supports a exponential backoff retry |
| mechanism for failed messages. That implementation has two constructors. Default no args constructor will configure first retry at 100 milliseconds and |
| subsequent retires at Math.pow(2, i-1) where i is the retry number in the range 2 to LONG.MAX_LONG. 2 represents the base for exponential function in seconds. |
| Other constructor takes retry interval in millis for first retry as first argument, base for exponential function in seconds as second argument and number of |
| retries as third argument. The methods of this interface and its working in accord with the spout is explained below</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"> <span class="kt">boolean</span> <span class="nf">failed</span> <span class="o">(</span><span class="n">KinesisMessageId</span> <span class="n">messageId</span><span class="o">);</span> |
| <span class="n">KinesisMessageId</span> <span class="nf">getNextFailedMessageToRetry</span> <span class="o">();</span> |
| <span class="kt">void</span> <span class="nf">failedMessageEmitted</span> <span class="o">(</span><span class="n">KinesisMessageId</span> <span class="n">messageId</span><span class="o">);</span> |
| <span class="kt">void</span> <span class="nf">acked</span> <span class="o">(</span><span class="n">KinesisMessageId</span> <span class="n">messageId</span><span class="o">);</span> |
| </code></pre></div> |
| <p>failed method will be called on every tuple that failed in the spout. It should return true if that failed message is scheduled to be retried, false otherwise.</p> |
| |
| <p>getNextFailedMessageToRetry method will be called the first thing every time a spout wants to emit a tuple. It should return a message that should be retried |
| if any or null otherwise. Note that it can return null in the case it does not have any message to retry as of that moment. However, it should eventually |
| return every message for which it returned true when failed method was called for that message</p> |
| |
| <p>failedMessageEmitted will be called if spout successfully manages to get the record from kinesis and emit it. If not, the implementation should return the same |
| message when getNextFailedMessageToRetry is called again</p> |
| |
| <p>acked will be called once the failed message was re-emitted and successfully acked by the spout. If it was failed by the spout failed will be called again</p> |
| |
| <h4 id="zkinfo-zkinfo"><code>ZkInfo</code> zkInfo</h4> |
| |
| <p>an object encapsulating information for zookeeper interaction. The constructor takes zkUrl as first argument which is a comma separated string of zk host and |
| port, zkNode as second that will be used as the root node for storing committed sequence numbers, session timeout as third in milliseconds, connection timeout |
| as fourth in milliseconds, commit interval as fifth in milliseconds for committing sequence numbers to zookeeper, retry attempts as sixth for zk client |
| connection retry attempts, retry interval as seventh in milliseconds for time to wait before retrying to connect. </p> |
| |
| <h4 id="kinesisconnectioninfo-kinesisconnectioninfo"><code>KinesisConnectionInfo</code> kinesisConnectionInfo</h4> |
| |
| <p>an object that captures arguments for connecting to kinesis using kinesis client. It has a constructor that takes an implementation of <code>AWSCredentialsProvider</code> |
| as first argument. This module provides an implementation called <code>CredentialsProviderChain</code> that allows the spout to authenticate with kinesis using one of |
| the 5 mechanisms in this order - <code>EnvironmentVariableCredentialsProvider</code>, <code>SystemPropertiesCredentialsProvider</code>, <code>ClasspathPropertiesFileCredentialsProvider</code>, |
| <code>InstanceProfileCredentialsProvider</code>, <code>ProfileCredentialsProvider</code>. It takes an object of <code>ClientConfiguration</code> as second argument for configuring the kinesis |
| client, <code>Regions</code> as third argument that sets the region to connect to on the client and recordsLimit as the fourth argument which represents the maximum number |
| of records kinesis client will retrieve for every GetRecords request. This limit should be carefully chosen based on the size of the record, kinesis |
| throughput rate limits and per tuple latency in storm for the topology. Also if one task will be reading from more than one shards then that will also affect |
| the choice of limit argument</p> |
| |
| <h4 id="long-maxuncommittedrecords"><code>Long</code> maxUncommittedRecords</h4> |
| |
| <p>this represents the maximum number of uncommitted sequence numbers allowed per task. Once this number is reached spout will not fetch any new records from |
| kinesis. Uncommited sequence numbers are defined as the sum of all the messages for a task that have not been committed to zookeeper. This is different from |
| topology level max pending messages. For example if this value is set to 10, and the spout emitted sequence numbers from 1 to 10. Sequence number 1 is pending |
| and 2 to 10 acked. In that case the number of uncommitted sequence numbers is 10 since no sequence number in the range 1 to 10 can be committed to zk. |
| However, storm can still call next tuple on the spout because there is only 1 pending message</p> |
| |
| <h3 id="maven-dependencies">Maven dependencies</h3> |
| |
| <p>Aws sdk version that this was tested with is 1.10.77</p> |
| <div class="highlight"><pre><code class="language-xml" data-lang="xml"> <span class="nt"><dependencies></span> |
| <span class="nt"><dependency></span> |
| <span class="nt"><groupId></span>com.amazonaws<span class="nt"></groupId></span> |
| <span class="nt"><artifactId></span>aws-java-sdk<span class="nt"></artifactId></span> |
| <span class="nt"><version></span>${aws-java-sdk.version}<span class="nt"></version></span> |
| <span class="nt"><scope></span>provided<span class="nt"></scope></span> |
| <span class="nt"></dependency></span> |
| <span class="nt"><dependency></span> |
| <span class="nt"><groupId></span>org.apache.storm<span class="nt"></groupId></span> |
| <span class="nt"><artifactId></span>storm-client<span class="nt"></artifactId></span> |
| <span class="nt"><version></span>${project.version}<span class="nt"></version></span> |
| <span class="nt"><scope></span>provided<span class="nt"></scope></span> |
| <span class="nt"></dependency></span> |
| <span class="nt"><dependency></span> |
| <span class="nt"><groupId></span>org.apache.curator<span class="nt"></groupId></span> |
| <span class="nt"><artifactId></span>curator-framework<span class="nt"></artifactId></span> |
| <span class="nt"><version></span>${curator.version}<span class="nt"></version></span> |
| <span class="nt"><exclusions></span> |
| <span class="nt"><exclusion></span> |
| <span class="nt"><groupId></span>log4j<span class="nt"></groupId></span> |
| <span class="nt"><artifactId></span>log4j<span class="nt"></artifactId></span> |
| <span class="nt"></exclusion></span> |
| <span class="nt"><exclusion></span> |
| <span class="nt"><groupId></span>org.slf4j<span class="nt"></groupId></span> |
| <span class="nt"><artifactId></span>slf4j-log4j12<span class="nt"></artifactId></span> |
| <span class="nt"></exclusion></span> |
| <span class="nt"></exclusions></span> |
| <span class="nt"></dependency></span> |
| <span class="nt"><dependency></span> |
| <span class="nt"><groupId></span>com.googlecode.json-simple<span class="nt"></groupId></span> |
| <span class="nt"><artifactId></span>json-simple<span class="nt"></artifactId></span> |
| <span class="nt"></dependency></span> |
| <span class="nt"></dependencies></span> |
| </code></pre></div> |
| <h1 id="future-work">Future Work</h1> |
| |
| <p>Handle merging or splitting of shards in kinesis, Trident spout implementation and metrics</p> |
| </div> |
| |
| |
| </div> |
| </div> |
| </div> |
| <footer> |
| <div class="container-fluid"> |
| <div class="row"> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>Meetups</h5> |
| <ul class="latest-news"> |
| |
| <li><a href="http://www.meetup.com/Apache-Storm-Apache-Kafka/">Apache Storm & Apache Kafka</a> <span class="small">(Sunnyvale, CA)</span></li> |
| |
| <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Apache Storm & Kafka Users</a> <span class="small">(Seattle, WA)</span></li> |
| |
| <li><a href="http://www.meetup.com/New-York-City-Storm-User-Group/">NYC Storm User Group</a> <span class="small">(New York, NY)</span></li> |
| |
| <li><a href="http://www.meetup.com/Bay-Area-Stream-Processing">Bay Area Stream Processing</a> <span class="small">(Emeryville, CA)</span></li> |
| |
| <li><a href="http://www.meetup.com/Boston-Storm-Users/">Boston Realtime Data</a> <span class="small">(Boston, MA)</span></li> |
| |
| <li><a href="http://www.meetup.com/storm-london">London Storm User Group</a> <span class="small">(London, UK)</span></li> |
| |
| <!-- <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Seatle, WA</a> <span class="small">(27 Jun 2015)</span></li> --> |
| </ul> |
| </div> |
| </div> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>About Apache Storm</h5> |
| <p>Apache Storm integrates with any queueing system and any database system. Apache Storm's spout abstraction makes it easy to integrate a new queuing system. Likewise, integrating Apache Storm with database systems is easy.</p> |
| </div> |
| </div> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>First Look</h5> |
| <ul class="footer-list"> |
| <li><a href="/releases/current/Rationale.html">Rationale</a></li> |
| <li><a href="/releases/current/Tutorial.html">Tutorial</a></li> |
| <li><a href="/releases/current/Setting-up-development-environment.html">Setting up development environment</a></li> |
| <li><a href="/releases/current/Creating-a-new-Storm-project.html">Creating a new Apache Storm project</a></li> |
| </ul> |
| </div> |
| </div> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>Documentation</h5> |
| <ul class="footer-list"> |
| <li><a href="/releases/current/index.html">Index</a></li> |
| <li><a href="/releases/current/javadocs/index.html">Javadoc</a></li> |
| <li><a href="/releases/current/FAQ.html">FAQ</a></li> |
| </ul> |
| </div> |
| </div> |
| </div> |
| <hr/> |
| <div class="row"> |
| <div class="col-md-12"> |
| <p align="center">Copyright © 2019 <a href="http://www.apache.org">Apache Software Foundation</a>. All Rights Reserved. |
| <br>Apache Storm, Apache, the Apache feather logo, and the Apache Storm project logos are trademarks of The Apache Software Foundation. |
| <br>All other marks mentioned may be trademarks or registered trademarks of their respective owners.</p> |
| </div> |
| </div> |
| </div> |
| </footer> |
| <!--Footer End--> |
| <!-- Scroll to top --> |
| <span class="totop"><a href="#"><i class="fa fa-angle-up"></i></a></span> |
| |
| </body> |
| |
| </html> |
| |