<!DOCTYPE html>
<html>
    <head>
    <meta charset="utf-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
    <meta name="viewport" content="width=device-width, initial-scale=1">

    <link rel="shortcut icon" href="/favicon.ico" type="image/x-icon">
    <link rel="icon" href="/favicon.ico" type="image/x-icon">

    <title>Storm Kinesis</title>

    <!-- Bootstrap core CSS -->
    <link href="/assets/css/bootstrap.min.css" rel="stylesheet">
    <!-- Bootstrap theme -->
    <link href="/assets/css/bootstrap-theme.min.css" rel="stylesheet">

    <!-- Custom styles for this template -->
    <link rel="stylesheet" href="http://fortawesome.github.io/Font-Awesome/assets/font-awesome/css/font-awesome.css">
    <link href="/css/style.css" rel="stylesheet">
    <link href="/assets/css/owl.theme.css" rel="stylesheet">
    <link href="/assets/css/owl.carousel.css" rel="stylesheet">
    <script type="text/javascript" src="/assets/js/jquery.min.js"></script>
    <script type="text/javascript" src="/assets/js/bootstrap.min.js"></script>
    <script type="text/javascript" src="/assets/js/owl.carousel.min.js"></script>
    <script type="text/javascript" src="/assets/js/storm.js"></script>
    <!-- Just for debugging purposes. Don't actually copy these 2 lines! -->
    <!--[if lt IE 9]><script src="../../assets/js/ie8-responsive-file-warning.js"></script><![endif]-->
    
    <!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
    <!--[if lt IE 9]>
      <script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
      <script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
    <![endif]-->
  </head>


  <body>
    <header>
  <div class="container-fluid">
     <div class="row">
          <div class="col-md-5">
            <a href="/index.html"><img src="/images/logo.png" class="logo" /></a>
          </div>
          <div class="col-md-5">
            
              <h1>Version: 2.1.0</h1>
            
          </div>
          <div class="col-md-2">
            <a href="/downloads.html" class="btn-std btn-block btn-download">Download</a>
          </div>
        </div>
    </div>
</header>
<!--Header End-->
<!--Navigation Begin-->
<div class="navbar" role="banner">
  <div class="container-fluid">
      <div class="navbar-header">
          <button class="navbar-toggle" type="button" data-toggle="collapse" data-target=".bs-navbar-collapse">
                <span class="icon-bar"></span>
                <span class="icon-bar"></span>
                <span class="icon-bar"></span>
            </button>
        </div>
        <nav class="collapse navbar-collapse bs-navbar-collapse" role="navigation">
          <ul class="nav navbar-nav">
              <li><a href="/index.html" id="home">Home</a></li>
                <li><a href="/getting-help.html" id="getting-help">Getting Help</a></li>
                <li><a href="/about/integrates.html" id="project-info">Project Information</a></li>
                <li class="dropdown">
                    <a href="#" class="dropdown-toggle" data-toggle="dropdown" id="documentation">Documentation <b class="caret"></b></a>
                    <ul class="dropdown-menu">
                      
                        
                          <li><a href="/releases/2.1.0/index.html">2.1.0</a></li>
                        
                      
                        
                          <li><a href="/releases/2.0.0/index.html">2.0.0</a></li>
                        
                      
                        
                          <li><a href="/releases/1.2.3/index.html">1.2.3</a></li>
                        
                      
                    </ul>
                </li>
                <li><a href="/talksAndVideos.html">Talks and Slideshows</a></li>
                <li class="dropdown">
                    <a href="#" class="dropdown-toggle" data-toggle="dropdown" id="contribute">Community <b class="caret"></b></a>
                    <ul class="dropdown-menu">
                        <li><a href="/contribute/Contributing-to-Storm.html">Contributing</a></li>
                        <li><a href="/contribute/People.html">People</a></li>
                        <li><a href="/contribute/BYLAWS.html">ByLaws</a></li>
                    </ul>
                </li>
                <li><a href="/2019/10/31/storm210-released.html" id="news">News</a></li>
            </ul>
        </nav>
    </div>
</div>



    <div class="container-fluid">
    <h1 class="page-title">Storm Kinesis</h1>
          <div class="row">
           	<div class="col-md-12">
	             <!-- Documentation -->

<p class="post-meta"></p>

<div class="documentation-content"><h1 id="storm-kinesis-spout">Storm Kinesis Spout</h1>

<p>Provides core storm spout for consuming data from a stream in Amazon Kinesis Streams. It stores the sequence numbers that can be committed in zookeeper and 
starts consuming records after that sequence number on restart by default. Below is the code sample to create a sample topology that uses the spout. Each 
object used in configuring the spout is explained below. Ideally, the number of spout tasks should be equal to number of shards in kinesis. However each task 
can read from more than one shard.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">KinesisSpoutTopology</span> <span class="o">{</span>
    <span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span> <span class="o">(</span><span class="n">String</span> <span class="n">args</span><span class="o">[])</span> <span class="kd">throws</span> <span class="n">InvalidTopologyException</span><span class="o">,</span> <span class="n">AuthorizationException</span><span class="o">,</span> <span class="n">AlreadyAliveException</span> <span class="o">{</span>
        <span class="n">String</span> <span class="n">topologyName</span> <span class="o">=</span> <span class="n">args</span><span class="o">[</span><span class="mi">0</span><span class="o">];</span>
        <span class="n">RecordToTupleMapper</span> <span class="n">recordToTupleMapper</span> <span class="o">=</span> <span class="k">new</span> <span class="n">TestRecordToTupleMapper</span><span class="o">();</span>
        <span class="n">KinesisConnectionInfo</span> <span class="n">kinesisConnectionInfo</span> <span class="o">=</span> <span class="k">new</span> <span class="n">KinesisConnectionInfo</span><span class="o">(</span><span class="k">new</span> <span class="n">CredentialsProviderChain</span><span class="o">(),</span> <span class="k">new</span> <span class="n">ClientConfiguration</span><span class="o">(),</span> <span class="n">Regions</span><span class="o">.</span><span class="na">US_WEST_2</span><span class="o">,</span>
                <span class="mi">1000</span><span class="o">);</span>
        <span class="n">ZkInfo</span> <span class="n">zkInfo</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ZkInfo</span><span class="o">(</span><span class="s">"localhost:2181"</span><span class="o">,</span> <span class="s">"/kinesisOffsets"</span><span class="o">,</span> <span class="mi">20000</span><span class="o">,</span> <span class="mi">15000</span><span class="o">,</span> <span class="mi">10000L</span><span class="o">,</span> <span class="mi">3</span><span class="o">,</span> <span class="mi">2000</span><span class="o">);</span>
        <span class="n">KinesisConfig</span> <span class="n">kinesisConfig</span> <span class="o">=</span> <span class="k">new</span> <span class="n">KinesisConfig</span><span class="o">(</span><span class="n">args</span><span class="o">[</span><span class="mi">1</span><span class="o">],</span> <span class="n">ShardIteratorType</span><span class="o">.</span><span class="na">TRIM_HORIZON</span><span class="o">,</span>
                <span class="n">recordToTupleMapper</span><span class="o">,</span> <span class="k">new</span> <span class="n">Date</span><span class="o">(),</span> <span class="k">new</span> <span class="n">ExponentialBackoffRetrier</span><span class="o">(),</span> <span class="n">zkInfo</span><span class="o">,</span> <span class="n">kinesisConnectionInfo</span><span class="o">,</span> <span class="mi">10000L</span><span class="o">);</span>
        <span class="n">KinesisSpout</span> <span class="n">kinesisSpout</span> <span class="o">=</span> <span class="k">new</span> <span class="n">KinesisSpout</span><span class="o">(</span><span class="n">kinesisConfig</span><span class="o">);</span>
        <span class="n">TopologyBuilder</span> <span class="n">topologyBuilder</span> <span class="o">=</span> <span class="k">new</span> <span class="n">TopologyBuilder</span><span class="o">();</span>
        <span class="n">topologyBuilder</span><span class="o">.</span><span class="na">setSpout</span><span class="o">(</span><span class="s">"spout"</span><span class="o">,</span> <span class="n">kinesisSpout</span><span class="o">,</span> <span class="mi">3</span><span class="o">);</span>
        <span class="n">topologyBuilder</span><span class="o">.</span><span class="na">setBolt</span><span class="o">(</span><span class="s">"bolt"</span><span class="o">,</span> <span class="k">new</span> <span class="n">KinesisBoltTest</span><span class="o">(),</span> <span class="mi">1</span><span class="o">).</span><span class="na">shuffleGrouping</span><span class="o">(</span><span class="s">"spout"</span><span class="o">);</span>
        <span class="n">Config</span> <span class="n">topologyConfig</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Config</span><span class="o">();</span>
        <span class="n">topologyConfig</span><span class="o">.</span><span class="na">setDebug</span><span class="o">(</span><span class="kc">true</span><span class="o">);</span>
        <span class="n">topologyConfig</span><span class="o">.</span><span class="na">setNumWorkers</span><span class="o">(</span><span class="mi">3</span><span class="o">);</span>
        <span class="n">StormSubmitter</span><span class="o">.</span><span class="na">submitTopology</span><span class="o">(</span><span class="n">topologyName</span><span class="o">,</span> <span class="n">topologyConfig</span><span class="o">,</span> <span class="n">topologyBuilder</span><span class="o">.</span><span class="na">createTopology</span><span class="o">());</span>
    <span class="o">}</span>
<span class="o">}</span>
</code></pre></div>
<p>As you can see above the spout takes an object of KinesisConfig in its constructor. The constructor of KinesisConfig takes 8 objects as explained below.</p>

<h4 id="string-streamname"><code>String</code> streamName</h4>

<p>name of kinesis stream to consume data from</p>

<h4 id="sharditeratortype-sharditeratortype"><code>ShardIteratorType</code> shardIteratorType</h4>

<p>3 types are supported - TRIM_HORIZON(beginning of shard), LATEST and AT_TIMESTAMP. By default this argument is ignored if state for shards 
is found in zookeeper. Hence they will apply the first time a topology is started. If you want to use any of these in subsequent runs of the topology, you 
will need to clear the state of zookeeper node used for storing sequence numbers</p>

<h4 id="recordtotuplemapper-recordtotuplemapper"><code>RecordToTupleMapper</code> recordToTupleMapper</h4>

<p>an implementation of <code>RecordToTupleMapper</code> interface that spout will call to convert a kinesis record to a storm tuple. It has two methods. getOutputFields 
tells the spout the fields that will be present in the tuple emitted from the getTuple method. If getTuple returns null, the record will be acked</p>
<div class="highlight"><pre><code class="language-java" data-lang="java">    <span class="n">Fields</span> <span class="nf">getOutputFields</span> <span class="o">();</span>
    <span class="n">List</span><span class="o">&lt;</span><span class="n">Object</span><span class="o">&gt;</span> <span class="nf">getTuple</span> <span class="o">(</span><span class="n">Record</span> <span class="n">record</span><span class="o">);</span>
</code></pre></div>
<h4 id="date-timestamp"><code>Date</code> timestamp</h4>

<p>used in conjunction with the AT_TIMESTAMP shardIteratorType argument. This will make the spout fetch records from kinesis starting at that time or later. The
time used by kinesis is the server side time associated to the record by kinesis</p>

<h4 id="failedmessageretryhadnler-failedmessageretryhandler"><code>FailedMessageRetryHadnler</code> failedMessageRetryHandler</h4>

<p>an implementation of the <code>FailedMessageRetryHandler</code> interface. By default this module provides an implementation that supports a exponential backoff retry
mechanism for failed messages. That implementation has two constructors. Default no args constructor will configure first retry at 100 milliseconds and 
subsequent retires at Math.pow(2, i-1) where i is the retry number in the range 2 to LONG.MAX_LONG. 2 represents the base for exponential function in seconds. 
Other constructor takes retry interval in millis for first retry as first argument, base for exponential function in seconds as second argument and number of 
retries as third argument. The methods of this interface and its working in accord with the spout is explained below</p>
<div class="highlight"><pre><code class="language-java" data-lang="java">    <span class="kt">boolean</span> <span class="nf">failed</span> <span class="o">(</span><span class="n">KinesisMessageId</span> <span class="n">messageId</span><span class="o">);</span>
    <span class="n">KinesisMessageId</span> <span class="nf">getNextFailedMessageToRetry</span> <span class="o">();</span>
    <span class="kt">void</span> <span class="nf">failedMessageEmitted</span> <span class="o">(</span><span class="n">KinesisMessageId</span> <span class="n">messageId</span><span class="o">);</span>
    <span class="kt">void</span> <span class="nf">acked</span> <span class="o">(</span><span class="n">KinesisMessageId</span> <span class="n">messageId</span><span class="o">);</span>
</code></pre></div>
<p>failed method will be called on every tuple that failed in the spout. It should return true if that failed message is scheduled to be retried, false otherwise.</p>

<p>getNextFailedMessageToRetry method will be called the first thing every time a spout wants to emit a tuple. It should return a message that should be retried
if any or null otherwise. Note that it can return null in the case it does not have any message to retry as of that moment. However, it should eventually 
return every message for which it returned true when failed method was called for that message</p>

<p>failedMessageEmitted will be called if spout successfully manages to get the record from kinesis and emit it. If not, the implementation should return the same 
message when getNextFailedMessageToRetry is called again</p>

<p>acked will be called once the failed message was re-emitted and successfully acked by the spout. If it was failed by the spout failed will be called again</p>

<h4 id="zkinfo-zkinfo"><code>ZkInfo</code> zkInfo</h4>

<p>an object encapsulating information for zookeeper interaction. The constructor takes zkUrl as first argument which is a comma separated string of zk host and
port, zkNode as second that will be used as the root node for storing committed sequence numbers, session timeout as third in milliseconds, connection timeout
as fourth in milliseconds, commit interval as fifth in milliseconds for committing sequence numbers to zookeeper, retry attempts as sixth for zk client
connection retry attempts, retry interval as seventh in milliseconds for time to wait before retrying to connect. </p>

<h4 id="kinesisconnectioninfo-kinesisconnectioninfo"><code>KinesisConnectionInfo</code> kinesisConnectionInfo</h4>

<p>an object that captures arguments for connecting to kinesis using kinesis client. It has a constructor that takes an implementation of <code>AWSCredentialsProvider</code>
as first argument. This module provides an implementation called <code>CredentialsProviderChain</code> that allows the spout to authenticate with kinesis using one of 
the 5 mechanisms in this order - <code>EnvironmentVariableCredentialsProvider</code>, <code>SystemPropertiesCredentialsProvider</code>, <code>ClasspathPropertiesFileCredentialsProvider</code>, 
<code>InstanceProfileCredentialsProvider</code>, <code>ProfileCredentialsProvider</code>. It takes an object of <code>ClientConfiguration</code> as second argument for configuring the kinesis 
client, <code>Regions</code> as third argument that sets the region to connect to on the client and recordsLimit as the fourth argument which represents the maximum number
of records kinesis client will retrieve for every GetRecords request. This limit should be carefully chosen based on the size of the record, kinesis 
throughput rate limits and per tuple latency in storm for the topology. Also if one task will be reading from more than one shards then that will also affect
the choice of limit argument</p>

<h4 id="long-maxuncommittedrecords"><code>Long</code> maxUncommittedRecords</h4>

<p>this represents the maximum number of uncommitted sequence numbers allowed per task. Once this number is reached spout will not fetch any new records from 
kinesis. Uncommited sequence numbers are defined as the sum of all the messages for a task that have not been committed to zookeeper. This is different from 
topology level max pending messages. For example if this value is set to 10, and the spout emitted sequence numbers from 1 to 10. Sequence number 1 is pending 
and 2 to 10 acked. In that case the number of uncommitted sequence numbers is 10 since no sequence number in the range 1 to 10 can be committed to zk. 
However, storm can still call next tuple on the spout because there is only 1 pending message</p>

<h3 id="maven-dependencies">Maven dependencies</h3>

<p>Aws sdk version that this was tested with is 1.10.77</p>
<div class="highlight"><pre><code class="language-xml" data-lang="xml"> <span class="nt">&lt;dependencies&gt;</span>
        <span class="nt">&lt;dependency&gt;</span>
            <span class="nt">&lt;groupId&gt;</span>com.amazonaws<span class="nt">&lt;/groupId&gt;</span>
            <span class="nt">&lt;artifactId&gt;</span>aws-java-sdk<span class="nt">&lt;/artifactId&gt;</span>
            <span class="nt">&lt;version&gt;</span>${aws-java-sdk.version}<span class="nt">&lt;/version&gt;</span>
            <span class="nt">&lt;scope&gt;</span>provided<span class="nt">&lt;/scope&gt;</span>
        <span class="nt">&lt;/dependency&gt;</span>
        <span class="nt">&lt;dependency&gt;</span>
            <span class="nt">&lt;groupId&gt;</span>org.apache.storm<span class="nt">&lt;/groupId&gt;</span>
            <span class="nt">&lt;artifactId&gt;</span>storm-client<span class="nt">&lt;/artifactId&gt;</span>
            <span class="nt">&lt;version&gt;</span>${project.version}<span class="nt">&lt;/version&gt;</span>
            <span class="nt">&lt;scope&gt;</span>provided<span class="nt">&lt;/scope&gt;</span>
        <span class="nt">&lt;/dependency&gt;</span>
        <span class="nt">&lt;dependency&gt;</span>
            <span class="nt">&lt;groupId&gt;</span>org.apache.curator<span class="nt">&lt;/groupId&gt;</span>
            <span class="nt">&lt;artifactId&gt;</span>curator-framework<span class="nt">&lt;/artifactId&gt;</span>
            <span class="nt">&lt;version&gt;</span>${curator.version}<span class="nt">&lt;/version&gt;</span>
            <span class="nt">&lt;exclusions&gt;</span>
                <span class="nt">&lt;exclusion&gt;</span>
                    <span class="nt">&lt;groupId&gt;</span>log4j<span class="nt">&lt;/groupId&gt;</span>
                    <span class="nt">&lt;artifactId&gt;</span>log4j<span class="nt">&lt;/artifactId&gt;</span>
                <span class="nt">&lt;/exclusion&gt;</span>
                <span class="nt">&lt;exclusion&gt;</span>
                    <span class="nt">&lt;groupId&gt;</span>org.slf4j<span class="nt">&lt;/groupId&gt;</span>
                    <span class="nt">&lt;artifactId&gt;</span>slf4j-log4j12<span class="nt">&lt;/artifactId&gt;</span>
                <span class="nt">&lt;/exclusion&gt;</span>
            <span class="nt">&lt;/exclusions&gt;</span>
        <span class="nt">&lt;/dependency&gt;</span>
        <span class="nt">&lt;dependency&gt;</span>
            <span class="nt">&lt;groupId&gt;</span>com.googlecode.json-simple<span class="nt">&lt;/groupId&gt;</span>
            <span class="nt">&lt;artifactId&gt;</span>json-simple<span class="nt">&lt;/artifactId&gt;</span>
        <span class="nt">&lt;/dependency&gt;</span>
 <span class="nt">&lt;/dependencies&gt;</span>
</code></pre></div>
<h1 id="future-work">Future Work</h1>

<p>Handle merging or splitting of shards in kinesis, Trident spout implementation and metrics</p>
</div>


	          </div>
	       </div>
	  </div>
<footer>
    <div class="container-fluid">
        <div class="row">
            <div class="col-md-3">
                <div class="footer-widget">
                    <h5>Meetups</h5>
                    <ul class="latest-news">
                        
                        <li><a href="http://www.meetup.com/Apache-Storm-Apache-Kafka/">Apache Storm & Apache Kafka</a> <span class="small">(Sunnyvale, CA)</span></li>
                        
                        <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Apache Storm & Kafka Users</a> <span class="small">(Seattle, WA)</span></li>
                        
                        <li><a href="http://www.meetup.com/New-York-City-Storm-User-Group/">NYC Storm User Group</a> <span class="small">(New York, NY)</span></li>
                        
                        <li><a href="http://www.meetup.com/Bay-Area-Stream-Processing">Bay Area Stream Processing</a> <span class="small">(Emeryville, CA)</span></li>
                        
                        <li><a href="http://www.meetup.com/Boston-Storm-Users/">Boston Realtime Data</a> <span class="small">(Boston, MA)</span></li>
                        
                        <li><a href="http://www.meetup.com/storm-london">London Storm User Group</a> <span class="small">(London, UK)</span></li>
                        
                        <!-- <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Seatle, WA</a> <span class="small">(27 Jun 2015)</span></li> -->
                    </ul>
                </div>
            </div>
            <div class="col-md-3">
                <div class="footer-widget">
                    <h5>About Apache Storm</h5>
                    <p>Apache Storm integrates with any queueing system and any database system. Apache Storm's spout abstraction makes it easy to integrate a new queuing system. Likewise, integrating Apache Storm with database systems is easy.</p>
               </div>
            </div>
            <div class="col-md-3">
                <div class="footer-widget">
                    <h5>First Look</h5>
                    <ul class="footer-list">
                        <li><a href="/releases/current/Rationale.html">Rationale</a></li>
                        <li><a href="/releases/current/Tutorial.html">Tutorial</a></li>
                        <li><a href="/releases/current/Setting-up-development-environment.html">Setting up development environment</a></li>
                        <li><a href="/releases/current/Creating-a-new-Storm-project.html">Creating a new Apache Storm project</a></li>
                    </ul>
                </div>
            </div>
            <div class="col-md-3">
                <div class="footer-widget">
                    <h5>Documentation</h5>
                    <ul class="footer-list">
                        <li><a href="/releases/current/index.html">Index</a></li>
                        <li><a href="/releases/current/javadocs/index.html">Javadoc</a></li>
                        <li><a href="/releases/current/FAQ.html">FAQ</a></li>
                    </ul>
                </div>
            </div>
        </div>
        <hr/>
        <div class="row">   
            <div class="col-md-12">
                <p align="center">Copyright © 2019 <a href="http://www.apache.org">Apache Software Foundation</a>. All Rights Reserved. 
                    <br>Apache Storm, Apache, the Apache feather logo, and the Apache Storm project logos are trademarks of The Apache Software Foundation. 
                    <br>All other marks mentioned may be trademarks or registered trademarks of their respective owners.</p>
            </div>
        </div>
    </div>
</footer>
<!--Footer End-->
<!-- Scroll to top -->
<span class="totop"><a href="#"><i class="fa fa-angle-up"></i></a></span> 

</body>

</html>

