blob: 824b6fa20ac204fc9ee4609bfb67464a6b015ce5 [file] [log] [blame]
<!DOCTYPE html>
<!--[if lt IE 7]> <html class="no-js lt-ie9 lt-ie8 lt-ie7"> <![endif]-->
<!--[if IE 7]> <html class="no-js lt-ie9 lt-ie8"> <![endif]-->
<!--[if IE 8]> <html class="no-js lt-ie9"> <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js"> <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1">
<title>Spark Streaming + Kinesis Integration - Spark 3.4.3 Documentation</title>
<link rel="stylesheet" href="css/bootstrap.min.css">
<style>
body {
padding-top: 60px;
padding-bottom: 40px;
}
</style>
<meta name="viewport" content="width=device-width">
<link rel="stylesheet" href="css/main.css">
<script src="js/vendor/modernizr-2.6.1-respond-1.1.0.min.js"></script>
<link rel="stylesheet" href="css/pygments-default.css">
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/docsearch.js@2/dist/cdn/docsearch.min.css" />
<link rel="stylesheet" href="css/docsearch.css">
<!-- Matomo -->
<script>
var _paq = window._paq = window._paq || [];
/* tracker methods like "setCustomDimension" should be called before "trackPageView" */
_paq.push(["disableCookies"]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="https://analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '40']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
<!-- End Matomo Code -->
</head>
<body>
<!--[if lt IE 7]>
<p class="chromeframe">You are using an outdated browser. <a href="https://browsehappy.com/">Upgrade your browser today</a> or <a href="http://www.google.com/chromeframe/?redirect=true">install Google Chrome Frame</a> to better experience this site.</p>
<![endif]-->
<!-- This code is taken from http://twitter.github.com/bootstrap/examples/hero.html -->
<nav class="navbar fixed-top navbar-expand-md navbar-light bg-light" id="topbar">
<div class="container">
<div class="navbar-header">
<div class="navbar-brand"><a href="index.html">
<img src="img/spark-logo-hd.png" style="height:50px;"/></a><span class="version">3.4.3</span>
</div>
</div>
<button class="navbar-toggler" type="button" data-toggle="collapse"
data-target="#navbarCollapse" aria-controls="navbarCollapse"
aria-expanded="false" aria-label="Toggle navigation">
<span class="navbar-toggler-icon"></span>
</button>
<div class="collapse navbar-collapse" id="navbarCollapse">
<ul class="navbar-nav">
<!--TODO(andyk): Add class="active" attribute to li some how.-->
<li class="nav-item"><a href="index.html" class="nav-link">Overview</a></li>
<li class="nav-item dropdown">
<a href="#" class="nav-link dropdown-toggle" id="navbarQuickStart" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">Programming Guides</a>
<div class="dropdown-menu" aria-labelledby="navbarQuickStart">
<a class="dropdown-item" href="quick-start.html">Quick Start</a>
<a class="dropdown-item" href="rdd-programming-guide.html">RDDs, Accumulators, Broadcasts Vars</a>
<a class="dropdown-item" href="sql-programming-guide.html">SQL, DataFrames, and Datasets</a>
<a class="dropdown-item" href="structured-streaming-programming-guide.html">Structured Streaming</a>
<a class="dropdown-item" href="streaming-programming-guide.html">Spark Streaming (DStreams)</a>
<a class="dropdown-item" href="ml-guide.html">MLlib (Machine Learning)</a>
<a class="dropdown-item" href="graphx-programming-guide.html">GraphX (Graph Processing)</a>
<a class="dropdown-item" href="sparkr.html">SparkR (R on Spark)</a>
<a class="dropdown-item" href="api/python/getting_started/index.html">PySpark (Python on Spark)</a>
</div>
</li>
<li class="nav-item dropdown">
<a href="#" class="nav-link dropdown-toggle" id="navbarAPIDocs" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">API Docs</a>
<div class="dropdown-menu" aria-labelledby="navbarAPIDocs">
<a class="dropdown-item" href="api/scala/org/apache/spark/index.html">Scala</a>
<a class="dropdown-item" href="api/java/index.html">Java</a>
<a class="dropdown-item" href="api/python/index.html">Python</a>
<a class="dropdown-item" href="api/R/index.html">R</a>
<a class="dropdown-item" href="api/sql/index.html">SQL, Built-in Functions</a>
</div>
</li>
<li class="nav-item dropdown">
<a href="#" class="nav-link dropdown-toggle" id="navbarDeploying" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">Deploying</a>
<div class="dropdown-menu" aria-labelledby="navbarDeploying">
<a class="dropdown-item" href="cluster-overview.html">Overview</a>
<a class="dropdown-item" href="submitting-applications.html">Submitting Applications</a>
<div class="dropdown-divider"></div>
<a class="dropdown-item" href="spark-standalone.html">Spark Standalone</a>
<a class="dropdown-item" href="running-on-mesos.html">Mesos</a>
<a class="dropdown-item" href="running-on-yarn.html">YARN</a>
<a class="dropdown-item" href="running-on-kubernetes.html">Kubernetes</a>
</div>
</li>
<li class="nav-item dropdown">
<a href="#" class="nav-link dropdown-toggle" id="navbarMore" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">More</a>
<div class="dropdown-menu" aria-labelledby="navbarMore">
<a class="dropdown-item" href="configuration.html">Configuration</a>
<a class="dropdown-item" href="monitoring.html">Monitoring</a>
<a class="dropdown-item" href="tuning.html">Tuning Guide</a>
<a class="dropdown-item" href="job-scheduling.html">Job Scheduling</a>
<a class="dropdown-item" href="security.html">Security</a>
<a class="dropdown-item" href="hardware-provisioning.html">Hardware Provisioning</a>
<a class="dropdown-item" href="migration-guide.html">Migration Guide</a>
<div class="dropdown-divider"></div>
<a class="dropdown-item" href="building-spark.html">Building Spark</a>
<a class="dropdown-item" href="https://spark.apache.org/contributing.html">Contributing to Spark</a>
<a class="dropdown-item" href="https://spark.apache.org/third-party-projects.html">Third Party Projects</a>
</div>
</li>
<li class="nav-item">
<input type="text" id="docsearch-input" placeholder="Search the docs…">
</li>
</ul>
<!--<span class="navbar-text navbar-right"><span class="version-text">v3.4.3</span></span>-->
</div>
</div>
</nav>
<div class="container-wrapper">
<div class="content mr-3" id="content">
<h1 class="title">Spark Streaming + Kinesis Integration</h1>
<p><a href="http://aws.amazon.com/kinesis/">Amazon Kinesis</a> is a fully managed service for real-time processing of streaming data at massive scale.
The Kinesis receiver creates an input DStream using the Kinesis Client Library (KCL) provided by Amazon under the Amazon Software License (ASL).
The KCL builds on top of the Apache 2.0 licensed AWS Java SDK and provides load-balancing, fault-tolerance, checkpointing through the concepts of Workers, Checkpoints, and Shard Leases.
Here we explain how to configure Spark Streaming to receive data from Kinesis.</p>
<h4 id="configuring-kinesis">Configuring Kinesis</h4>
<p>A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or more shards per the following
<a href="http://docs.aws.amazon.com/kinesis/latest/dev/step-one-create-stream.html">guide</a>.</p>
<h4 id="configuring-spark-streaming-application">Configuring Spark Streaming Application</h4>
<ol>
<li>
<p><strong>Linking:</strong> For Scala/Java applications using SBT/Maven project definitions, link your streaming application against the following artifact (see <a href="streaming-programming-guide.html#linking">Linking section</a> in the main programming guide for further information).</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code> groupId = org.apache.spark
artifactId = spark-streaming-kinesis-asl_2.12
version = 3.4.3
</code></pre></div> </div>
<p>For Python applications, you will have to add this above library and its dependencies when deploying your application. See the <em>Deploying</em> subsection below.
<strong>Note that by linking to this library, you will include <a href="https://aws.amazon.com/asl/">ASL</a>-licensed code in your application.</strong></p>
</li>
<li>
<p><strong>Programming:</strong> In the streaming application code, import <code class="language-plaintext highlighter-rouge">KinesisInputDStream</code> and create the input DStream of byte array as follows:</p>
<div class="codetabs">
<div data-lang="scala">
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code> import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.KinesisInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kinesis.KinesisInitialPositions
val kinesisStream = KinesisInputDStream.builder
.streamingContext(streamingContext)
.endpointUrl([endpoint URL])
.regionName([region name])
.streamName([streamName])
.initialPosition([initial position])
.checkpointAppName([Kinesis app name])
.checkpointInterval([checkpoint interval])
.metricsLevel([metricsLevel.DETAILED])
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.build()
</code></pre></div> </div>
<p>See the <a href="api/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.html">API docs</a>
and the <a href="https://github.com/apache/spark/tree/master/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala">example</a>. Refer to the <a href="#running-the-example">Running the Example</a> subsection for instructions on how to run the example.</p>
</div>
<div data-lang="java">
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code> import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.kinesis.KinesisInputDStream;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
KinesisInputDStream&lt;byte[]&gt; kinesisStream = KinesisInputDStream.builder()
.streamingContext(streamingContext)
.endpointUrl([endpoint URL])
.regionName([region name])
.streamName([streamName])
.initialPosition([initial position])
.checkpointAppName([Kinesis app name])
.checkpointInterval([checkpoint interval])
.metricsLevel([metricsLevel.DETAILED])
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.build();
</code></pre></div> </div>
<p>See the <a href="api/java/index.html?org/apache/spark/streaming/kinesis/KinesisInputDStream.html">API docs</a>
and the <a href="https://github.com/apache/spark/tree/master/connector/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java">example</a>. Refer to the <a href="#running-the-example">Running the Example</a> subsection for instructions to run the example.</p>
</div>
<div data-lang="python">
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code> from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
kinesisStream = KinesisUtils.createStream(
streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
[region name], [initial position], [checkpoint interval], [metricsLevel.DETAILED], StorageLevel.MEMORY_AND_DISK_2)
</code></pre></div> </div>
<p>See the <a href="api/python/reference/pyspark.streaming.html#kinesis">API docs</a>
and the <a href="https://github.com/apache/spark/tree/master/connector/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py">example</a>. Refer to the <a href="#running-the-example">Running the Example</a> subsection for instructions to run the example.</p>
<ul>
<li>CloudWatch metrics level and dimensions. See <a href="https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html">the AWS documentation about monitoring KCL</a> for details. Default is MetricsLevel.DETAILED</li>
</ul>
</div>
</div>
<p>You may also provide the following settings. This is currently only supported in Scala and Java.</p>
<ul>
<li>A &#8220;message handler function&#8221; that takes a Kinesis <code class="language-plaintext highlighter-rouge">Record</code> and returns a generic object <code class="language-plaintext highlighter-rouge">T</code>, in case you would like to use other data included in a <code class="language-plaintext highlighter-rouge">Record</code> such as partition key.</li>
</ul>
<div class="codetabs">
<div data-lang="scala">
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code> import collection.JavaConverters._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.KinesisInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kinesis.KinesisInitialPositions
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
val kinesisStream = KinesisInputDStream.builder
.streamingContext(streamingContext)
.endpointUrl([endpoint URL])
.regionName([region name])
.streamName([streamName])
.initialPosition([initial position])
.checkpointAppName([Kinesis app name])
.checkpointInterval([checkpoint interval])
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.metricsLevel(MetricsLevel.DETAILED)
.metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet)
.buildWithMessageHandler([message handler])
</code></pre></div> </div>
</div>
<div data-lang="java">
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code> import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.kinesis.KinesisInputDStream;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import scala.collection.JavaConverters;
KinesisInputDStream&lt;byte[]&gt; kinesisStream = KinesisInputDStream.builder()
.streamingContext(streamingContext)
.endpointUrl([endpoint URL])
.regionName([region name])
.streamName([streamName])
.initialPosition([initial position])
.checkpointAppName([Kinesis app name])
.checkpointInterval([checkpoint interval])
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.metricsLevel(MetricsLevel.DETAILED)
.metricsEnabledDimensions(JavaConverters.asScalaSetConverter(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS).asScala().toSet())
.buildWithMessageHandler([message handler]);
</code></pre></div> </div>
</div>
</div>
<ul>
<li>
<p><code class="language-plaintext highlighter-rouge">streamingContext</code>: StreamingContext containing an application name used by Kinesis to tie this Kinesis application to the Kinesis stream</p>
</li>
<li><code class="language-plaintext highlighter-rouge">[Kinesis app name]</code>: The application name that will be used to checkpoint the Kinesis
sequence numbers in DynamoDB table.
<ul>
<li>The application name must be unique for a given account and region.</li>
<li>If the table exists but has incorrect checkpoint information (for a different stream, or
old expired sequenced numbers), then there may be temporary errors.</li>
</ul>
</li>
<li>
<p><code class="language-plaintext highlighter-rouge">[Kinesis stream name]</code>: The Kinesis stream that this streaming application will pull data from.</p>
</li>
<li>
<p><code class="language-plaintext highlighter-rouge">[endpoint URL]</code>: Valid Kinesis endpoints URL can be found <a href="http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region">here</a>.</p>
</li>
<li>
<p><code class="language-plaintext highlighter-rouge">[region name]</code>: Valid Kinesis region names can be found <a href="https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html">here</a>.</p>
</li>
<li>
<p><code class="language-plaintext highlighter-rouge">[checkpoint interval]</code>: The interval (e.g., Duration(2000) = 2 seconds) at which the Kinesis Client Library saves its position in the stream. For starters, set it to the same as the batch interval of the streaming application.</p>
</li>
<li>
<p><code class="language-plaintext highlighter-rouge">[initial position]</code>: Can be either <code class="language-plaintext highlighter-rouge">KinesisInitialPositions.TrimHorizon</code> or <code class="language-plaintext highlighter-rouge">KinesisInitialPositions.Latest</code> or <code class="language-plaintext highlighter-rouge">KinesisInitialPositions.AtTimestamp</code> (see <a href="#kinesis-checkpointing"><code class="language-plaintext highlighter-rouge">Kinesis Checkpointing</code></a> section and <a href="http://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-sdk.html"><code class="language-plaintext highlighter-rouge">Amazon Kinesis API documentation</code></a> for more details).</p>
</li>
<li><code class="language-plaintext highlighter-rouge">[message handler]</code>: A function that takes a Kinesis <code class="language-plaintext highlighter-rouge">Record</code> and outputs generic <code class="language-plaintext highlighter-rouge">T</code>.</li>
</ul>
<p>In other versions of the API, you can also specify the AWS access key and secret key directly.</p>
</li>
<li>
<p><strong>Deploying:</strong> As with any Spark applications, <code class="language-plaintext highlighter-rouge">spark-submit</code> is used to launch your application. However, the details are slightly different for Scala/Java applications and Python applications.</p>
<p>For Scala and Java applications, if you are using SBT or Maven for project management, then package <code class="language-plaintext highlighter-rouge">spark-streaming-kinesis-asl_2.12</code> and its dependencies into the application JAR. Make sure <code class="language-plaintext highlighter-rouge">spark-core_2.12</code> and <code class="language-plaintext highlighter-rouge">spark-streaming_2.12</code> are marked as <code class="language-plaintext highlighter-rouge">provided</code> dependencies as those are already present in a Spark installation. Then use <code class="language-plaintext highlighter-rouge">spark-submit</code> to launch your application (see <a href="streaming-programming-guide.html#deploying-applications">Deploying section</a> in the main programming guide).</p>
<p>For Python applications which lack SBT/Maven project management, <code class="language-plaintext highlighter-rouge">spark-streaming-kinesis-asl_2.12</code> and its dependencies can be directly added to <code class="language-plaintext highlighter-rouge">spark-submit</code> using <code class="language-plaintext highlighter-rouge">--packages</code> (see <a href="submitting-applications.html">Application Submission Guide</a>). That is,</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code> ./bin/spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl_2.12:3.4.3 ...
</code></pre></div> </div>
<p>Alternatively, you can also download the JAR of the Maven artifact <code class="language-plaintext highlighter-rouge">spark-streaming-kinesis-asl-assembly</code> from the
<a href="http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kinesis-asl-assembly_2.12%22%20AND%20v%3A%223.4.3%22">Maven repository</a> and add it to <code class="language-plaintext highlighter-rouge">spark-submit</code> with <code class="language-plaintext highlighter-rouge">--jars</code>.</p>
<p style="text-align: center;">
<img src="img/streaming-kinesis-arch.png" title="Spark Streaming Kinesis Architecture" alt="Spark Streaming Kinesis Architecture" width="60%" />
<!-- Images are downsized intentionally to improve quality on retina displays -->
</p>
<p><em>Points to remember at runtime:</em></p>
<ul>
<li>
<p>Kinesis data processing is ordered per partition and occurs at-least once per message.</p>
</li>
<li>
<p>Multiple applications can read from the same Kinesis stream. Kinesis will maintain the application-specific shard and checkpoint info in DynamoDB.</p>
</li>
<li>
<p>A single Kinesis stream shard is processed by one input DStream at a time.</p>
</li>
<li>
<p>A single Kinesis input DStream can read from multiple shards of a Kinesis stream by creating multiple KinesisRecordProcessor threads.</p>
</li>
<li>
<p>Multiple input DStreams running in separate processes/instances can read from a Kinesis stream.</p>
</li>
<li>
<p>You never need more Kinesis input DStreams than the number of Kinesis stream shards as each input DStream will create at least one KinesisRecordProcessor thread that handles a single shard.</p>
</li>
<li>
<p>Horizontal scaling is achieved by adding/removing Kinesis input DStreams (within a single process or across multiple processes/instances) - up to the total number of Kinesis stream shards per the previous point.</p>
</li>
<li>
<p>The Kinesis input DStream will balance the load between all DStreams - even across processes/instances.</p>
</li>
<li>
<p>The Kinesis input DStream will balance the load during re-shard events (merging and splitting) due to changes in load.</p>
</li>
<li>
<p>As a best practice, it&#8217;s recommended that you avoid re-shard jitter by over-provisioning when possible.</p>
</li>
<li>
<p>Each Kinesis input DStream maintains its own checkpoint info. See the Kinesis Checkpointing section for more details.</p>
</li>
<li>
<p>There is no correlation between the number of Kinesis stream shards and the number of RDD partitions/shards created across the Spark cluster during input DStream processing. These are 2 independent partitioning schemes.</p>
</li>
</ul>
</li>
</ol>
<h4 id="running-the-example">Running the Example</h4>
<p>To run the example,</p>
<ul>
<li>
<p>Download a Spark binary from the <a href="https://spark.apache.org/downloads.html">download site</a>.</p>
</li>
<li>
<p>Set up Kinesis stream (see earlier section) within AWS. Note the name of the Kinesis stream and the endpoint URL corresponding to the region where the stream was created.</p>
</li>
<li>
<p>Set up the environment variables <code class="language-plaintext highlighter-rouge">AWS_ACCESS_KEY_ID</code> and <code class="language-plaintext highlighter-rouge">AWS_SECRET_ACCESS_KEY</code> with your AWS credentials.</p>
</li>
<li>
<p>In the Spark root directory, run the example as</p>
<div class="codetabs">
<div data-lang="scala">
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code> ./bin/run-example --packages org.apache.spark:spark-streaming-kinesis-asl_2.12:3.4.3 streaming.KinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL]
</code></pre></div> </div>
</div>
<div data-lang="java">
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code> ./bin/run-example --packages org.apache.spark:spark-streaming-kinesis-asl_2.12:3.4.3 streaming.JavaKinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL]
</code></pre></div> </div>
</div>
<div data-lang="python">
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code> ./bin/spark-submit --jars 'connector/kinesis-asl-assembly/target/spark-streaming-kinesis-asl-assembly_*.jar' \
connector/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py \
[Kinesis app name] [Kinesis stream name] [endpoint URL] [region name]
</code></pre></div> </div>
</div>
</div>
<p>This will wait for data to be received from the Kinesis stream.</p>
</li>
<li>
<p>To generate random string data to put onto the Kinesis stream, in another terminal, run the associated Kinesis data producer.</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code> ./bin/run-example streaming.KinesisWordProducerASL [Kinesis stream name] [endpoint URL] 1000 10
</code></pre></div> </div>
<p>This will push 1000 lines per second of 10 random numbers per line to the Kinesis stream. This data should then be received and processed by the running example.</p>
</li>
</ul>
<h4 id="record-de-aggregation">Record De-aggregation</h4>
<p>When data is generated using the <a href="http://docs.aws.amazon.com/kinesis/latest/dev/developing-producers-with-kpl.html">Kinesis Producer Library (KPL)</a>, messages may be aggregated for cost savings. Spark Streaming will automatically
de-aggregate records during consumption.</p>
<h4 id="kinesis-checkpointing">Kinesis Checkpointing</h4>
<ul>
<li>
<p>Each Kinesis input DStream periodically stores the current position of the stream in the backing DynamoDB table. This allows the system to recover from failures and continue processing where the DStream left off.</p>
</li>
<li>
<p>Checkpointing too frequently will cause excess load on the AWS checkpoint storage layer and may lead to AWS throttling. The provided example handles this throttling with a random-backoff-retry strategy.</p>
</li>
<li>
<p>If no Kinesis checkpoint info exists when the input DStream starts, it will start either from the oldest record available (<code class="language-plaintext highlighter-rouge">KinesisInitialPositions.TrimHorizon</code>), or from the latest tip (<code class="language-plaintext highlighter-rouge">KinesisInitialPositions.Latest</code>), or (except Python) from the position denoted by the provided UTC timestamp (<code class="language-plaintext highlighter-rouge">KinesisInitialPositions.AtTimestamp(Date timestamp)</code>). This is configurable.</p>
<ul>
<li><code class="language-plaintext highlighter-rouge">KinesisInitialPositions.Latest</code> could lead to missed records if data is added to the stream while no input DStreams are running (and no checkpoint info is being stored).</li>
<li><code class="language-plaintext highlighter-rouge">KinesisInitialPositions.TrimHorizon</code> may lead to duplicate processing of records where the impact is dependent on checkpoint frequency and processing idempotency.</li>
</ul>
</li>
</ul>
<h4 id="kinesis-retry-configuration">Kinesis retry configuration</h4>
<ul>
<li><code class="language-plaintext highlighter-rouge">spark.streaming.kinesis.retry.waitTime</code> : Wait time between Kinesis retries as a duration string. When reading from Amazon Kinesis, users may hit <code class="language-plaintext highlighter-rouge">ProvisionedThroughputExceededException</code>&#8217;s, when consuming faster than 5 transactions/second or, exceeding the maximum read rate of 2 MiB/second. This configuration can be tweaked to increase the sleep between fetches when a fetch fails to reduce these exceptions. Default is &#8220;100ms&#8221;.</li>
<li><code class="language-plaintext highlighter-rouge">spark.streaming.kinesis.retry.maxAttempts</code> : Max number of retries for Kinesis fetches. This config can also be used to tackle the Kinesis <code class="language-plaintext highlighter-rouge">ProvisionedThroughputExceededException</code>&#8217;s in scenarios mentioned above. It can be increased to have more number of retries for Kinesis reads. Default is 3.</li>
</ul>
</div>
<!-- /container -->
</div>
<script src="js/vendor/jquery-3.5.1.min.js"></script>
<script src="js/vendor/bootstrap.bundle.min.js"></script>
<script src="js/vendor/anchor.min.js"></script>
<script src="js/main.js"></script>
<script type="text/javascript" src="https://cdn.jsdelivr.net/npm/docsearch.js@2/dist/cdn/docsearch.min.js"></script>
<script type="text/javascript">
// DocSearch is entirely free and automated. DocSearch is built in two parts:
// 1. a crawler which we run on our own infrastructure every 24 hours. It follows every link
// in your website and extract content from every page it traverses. It then pushes this
// content to an Algolia index.
// 2. a JavaScript snippet to be inserted in your website that will bind this Algolia index
// to your search input and display its results in a dropdown UI. If you want to find more
// details on how works DocSearch, check the docs of DocSearch.
docsearch({
apiKey: 'd62f962a82bc9abb53471cb7b89da35e',
appId: 'RAI69RXRSK',
indexName: 'apache_spark',
inputSelector: '#docsearch-input',
enhancedSearchInput: true,
algoliaOptions: {
'facetFilters': ["version:3.4.3"]
},
debug: false // Set debug to true if you want to inspect the dropdown
});
</script>
<!-- MathJax Section -->
<script type="text/x-mathjax-config">
MathJax.Hub.Config({
TeX: { equationNumbers: { autoNumber: "AMS" } }
});
</script>
<script>
// Note that we load MathJax this way to work with local file (file://), HTTP and HTTPS.
// We could use "//cdn.mathjax...", but that won't support "file://".
(function(d, script) {
script = d.createElement('script');
script.type = 'text/javascript';
script.async = true;
script.onload = function(){
MathJax.Hub.Config({
tex2jax: {
inlineMath: [ ["$", "$"], ["\\\\(","\\\\)"] ],
displayMath: [ ["$$","$$"], ["\\[", "\\]"] ],
processEscapes: true,
skipTags: ['script', 'noscript', 'style', 'textarea', 'pre']
}
});
};
script.src = ('https:' == document.location.protocol ? 'https://' : 'http://') +
'cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.1/MathJax.js' +
'?config=TeX-AMS-MML_HTMLorMML';
d.getElementsByTagName('head')[0].appendChild(script);
}(document));
</script>
</body>
</html>