blob: 663fe10d47c12e746031c54a105379aaadcbdb65 [file] [log] [blame]
<!DOCTYPE html>
<html lang="en" dir=ZgotmplZ>
<head>
<link rel="stylesheet" href="/bootstrap/css/bootstrap.min.css">
<script src="/bootstrap/js/bootstrap.bundle.min.js"></script>
<link rel="stylesheet" type="text/css" href="/font-awesome/css/font-awesome.min.css">
<script src="/js/anchor.min.js"></script>
<script src="/js/flink.js"></script>
<link rel="canonical" href="https://flink.apache.org/2024/11/25/introducing-the-new-amazon-kinesis-data-stream-and-amazon-dynamodb-stream-sources/">
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="description" content="We are pleased to introduce updated versions of the Amazon Kinesis Data Stream and Amazon DynamoDB Stream sources. Built on the FLIP-27 source interface, these newer connectors introduce 7 new features and are compatible with Flink 2.0.
The new KinesisStreamsSource replaces the legacy FlinkKinesisConsumer; and the new DynamoDbStreamsSource replaces the legacy FlinkDynamoDBStreamsConsumer. The new connectors are available for Flink 1.19 onwards, and AWS Connector version 5.0.0 onwards. For more information, see the section on Dependencies.">
<meta name="theme-color" content="#FFFFFF"><meta property="og:title" content="Introducing the new Amazon Kinesis Data Stream and Amazon DynamoDB Stream sources" />
<meta property="og:description" content="We are pleased to introduce updated versions of the Amazon Kinesis Data Stream and Amazon DynamoDB Stream sources. Built on the FLIP-27 source interface, these newer connectors introduce 7 new features and are compatible with Flink 2.0.
The new KinesisStreamsSource replaces the legacy FlinkKinesisConsumer; and the new DynamoDbStreamsSource replaces the legacy FlinkDynamoDBStreamsConsumer. The new connectors are available for Flink 1.19 onwards, and AWS Connector version 5.0.0 onwards. For more information, see the section on Dependencies." />
<meta property="og:type" content="article" />
<meta property="og:url" content="https://flink.apache.org/2024/11/25/introducing-the-new-amazon-kinesis-data-stream-and-amazon-dynamodb-stream-sources/" /><meta property="article:section" content="posts" />
<meta property="article:published_time" content="2024-11-25T18:00:00+00:00" />
<meta property="article:modified_time" content="2024-11-25T18:00:00+00:00" />
<title>Introducing the new Amazon Kinesis Data Stream and Amazon DynamoDB Stream sources | Apache Flink</title>
<link rel="manifest" href="/manifest.json">
<link rel="icon" href="/favicon.png" type="image/x-icon">
<link rel="stylesheet" href="/book.min.22eceb4d17baa9cdc0f57345edd6f215a40474022dfee39b63befb5fb3c596b5.css" integrity="sha256-IuzrTRe6qc3A9XNF7dbyFaQEdAIt/uObY777X7PFlrU=">
<script defer src="/en.search.min.b58d961779f91cae8414117efac138dcbed605c935bfb22393047cf18fc734bd.js" integrity="sha256-tY2WF3n5HK6EFBF&#43;&#43;sE43L7WBck1v7IjkwR88Y/HNL0="></script>
<!--
Made with Book Theme
https://github.com/alex-shpak/hugo-book
-->
<meta name="generator" content="Hugo 0.124.1">
<script>
var _paq = window._paq = window._paq || [];
_paq.push(['disableCookies']);
_paq.push(["setDomains", ["*.flink.apache.org","*.nightlies.apache.org/flink"]]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="//analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '1']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
</head>
<body dir=ZgotmplZ>
<header>
<nav class="navbar navbar-expand-xl">
<div class="container-fluid">
<a class="navbar-brand" href="/">
<img src="/img/logo/png/100/flink_squirrel_100_color.png" alt="Apache Flink" height="47" width="47" class="d-inline-block align-text-middle">
<span>Apache Flink</span>
</a>
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent" aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
<i class="fa fa-bars navbar-toggler-icon"></i>
</button>
<div class="collapse navbar-collapse" id="navbarSupportedContent">
<ul class="navbar-nav">
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">About</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/what-is-flink/flink-architecture/">Architecture</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-applications/">Applications</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-operations/">Operations</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/use-cases/">Use Cases</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/powered-by/">Powered By</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/roadmap/">Roadmap</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/community/">Community & Project Info</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/security/">Security</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/special-thanks/">Special Thanks</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Getting Started</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/">With Flink<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/docs/try-flink-kubernetes-operator/quick-start/">With Flink Kubernetes Operator<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable/docs/get-started/introduction/">With Flink CDC<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/docs/try-flink-ml/quick-start/">With Flink ML<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/getting-started/project-setup.html">With Flink Stateful Functions<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/overview/">Training Course<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Documentation</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/">Flink 2.1 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-lts/">Flink 1.20 (LTS)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-master/">Flink Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/">Kubernetes Operator 1.12 (latest)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main">Kubernetes Operator Main (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable">CDC 3.5 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-master">CDC Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/">ML 2.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-master">ML Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/">Stateful Functions 3.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-master">Stateful Functions Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">How to Contribute</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/how-to-contribute/overview/">Overview</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-code/">Contribute Code</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/reviewing-prs/">Review Pull Requests</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/code-style-and-quality-preamble/">Code Style and Quality Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-documentation/">Contribute Documentation</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/documentation-style-guide/">Documentation Style Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/improve-website/">Contribute to the Website</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/getting-help/">Getting Help</a>
</li>
</ul>
</li>
<li class="nav-item">
<a class="nav-link" href="/posts/">Flink Blog</a>
</li>
<li class="nav-item">
<a class="nav-link" href="/downloads/">Downloads</a>
</li>
</ul>
<div class="book-search">
<div class="book-search-spinner hidden">
<i class="fa fa-refresh fa-spin"></i>
</div>
<form class="search-bar d-flex" onsubmit="return false;"su>
<input type="text" id="book-search-input" placeholder="Search" aria-label="Search" maxlength="64" data-hotkeys="s/">
<i class="fa fa-search search"></i>
<i class="fa fa-circle-o-notch fa-spin spinner"></i>
</form>
<div class="book-search-spinner hidden"></div>
<ul id="book-search-results"></ul>
</div>
</div>
</div>
</nav>
<div class="navbar-clearfix"></div>
</header>
<main class="flex">
<section class="container book-page">
<article class="markdown">
<h1>
<a href="/2024/11/25/introducing-the-new-amazon-kinesis-data-stream-and-amazon-dynamodb-stream-sources/">Introducing the new Amazon Kinesis Data Stream and Amazon DynamoDB Stream sources</a>
</h1>
November 25, 2024 -
Hong Liang Teoh
<p><p>We are pleased to introduce updated versions of the Amazon Kinesis Data Stream and Amazon DynamoDB Stream sources. Built on the <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A&#43;Refactor&#43;Source&#43;Interface">FLIP-27 source interface</a>, these newer connectors introduce 7 new features and are compatible with Flink 2.0.</p>
<p>The new <a href="https://github.com/apache/flink-connector-aws/blob/v5.0/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java"><code>KinesisStreamsSource</code></a> replaces the legacy <a href="https://github.com/apache/flink-connector-aws/blob/v5.0/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java"><code>FlinkKinesisConsumer</code></a>; and the new <a href="https://github.com/apache/flink-connector-aws/blob/v5.0/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/DynamoDbStreamsSource.java"><code>DynamoDbStreamsSource</code></a> replaces the legacy <a href="https://github.com/apache/flink-connector-aws/blob/v5.0/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkDynamoDBStreamsConsumer.java"><code>FlinkDynamoDBStreamsConsumer</code></a>. The new connectors are available for Flink 1.19 onwards, and AWS Connector version 5.0.0 onwards. For more information, see the section on <a href="#dependencies">Dependencies</a>.</p>
<p>In this blogpost, we will dive into the motivation for the new source connectors, the improvements introduced, and provide migration guidance for users.</p>
<h2 id="dependencies">
Dependencies
<a class="anchor" href="#dependencies">#</a>
</h2>
<table>
<tr>
<th>Connector</th>
<th>API</th>
<th>Dependency</th>
<th>Usage</th>
</tr>
<tr>
<td>Amazon Kinesis Data Streams source</td>
<td>DataStream<br>Table API</td>
<td> Use the <code>flink-connector-aws-kinesis-streams</code> artifact. See <a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/">
Flink Kinesis connector documentation</a> for details.
</td>
<td>
Use the fluent
<a href="https://github.com/apache/flink-connector-aws/blob/v5.0/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceBuilder.java">
KinesisStreamsSourceBuilder</a> to create the source. Look at the <a href="#example-migrating-flinkkinesisconsumer-to-kinesisstreamssource">migration guidance section</a> for more details.
</td>
</tr>
<tr>
<td>Amazon Kinesis Data Streams source</td>
<td>SQL</td>
<td> Use the <code>flink-sql-connector-aws-kinesis-streams</code> artifact. See <a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/kinesis/">
Flink SQL Kinesis connector documentation</a> for details.
</td>
<td>
Use the table identifier <code>kinesis</code>. See the
<a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/kinesis/">
Flink SQL Kinesis connector documentation</a> for configuration and usage details.
</td>
</tr>
<tr>
<td>Amazon DynamoDB Streams source</td>
<td>DataStream</td>
<td> Use the <code>flink-connector-dynamodb</code> artifact. See <a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/">
Flink DynamoDB connector documentation</a> for details.
</td>
<td>
Use the fluent
<a href="https://github.com/apache/flink-connector-aws/blob/v5.0/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/DynamoDbStreamsSourceBuilder.java">
DynamoDbStreamsSourceBuilder</a> to create the source. Look at the <a href="#example-migrating-flinkdynamodbstreamsconsumer-to-dynamodbstreamssource">migration guidance section</a> for more details.
</td>
</tr>
</table>
<h2 id="why-did-we-need-new-source-connectors">
Why did we need new source connectors?
<a class="anchor" href="#why-did-we-need-new-source-connectors">#</a>
</h2>
<p>We implemented new source connectors because the <code>FlinkKinesisConsumer</code> and <code>FlinkDynamoDBStreamsConsumer</code> use the deprecated <code>SourceFunction</code> interface, which is removed in Flink 2.x. From Flink 2.x onwards, only <code>KinesisStreamsSource</code> and <code>DynamoDbStreamsSource</code>, which use the new <code>Source</code> interface will be supported.</p>
<p>In addition, the new <code>Source</code> interface introduces new features and standardisation across various Flink sources, such as <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A&#43;Standardize&#43;Connector&#43;Metrics">unified metrics</a>, <a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/event-time/generating_watermarks/">native watermark handling</a>, and support for coordination via a <code>SourceEnumerator</code> component running on the JobManager.</p>
<h2 id="new-features">
New features
<a class="anchor" href="#new-features">#</a>
</h2>
<p>The updated <code>KinesisStreamsSource</code> and <code>DynamoDbStreamsSource</code> connectors offer the following new features:</p>
<ol>
<li><strong>Native Flink watermark integration.</strong> On the new <code>Source</code> interface, watermark generation is abstracted away to the Flink framework, and no longer a responsibility of the source. This means the new source has support for watermark alignment, and idle watermark handling out-of-the-box.</li>
<li><strong>Standardised Flink Source metrics.</strong> The new <code>Source</code> framework also introduces <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A&#43;Standardize&#43;Connector&#43;Metrics">standardised Source metrics</a>. This enables users to track record throughput and lag across sources in a standardised manner.</li>
<li><strong>Records are read in-order even after a resharding operation on the stream.</strong> The new <code>Source</code> ensures that parent shards are read completely before reading children shards. This allows record ordering to be maintained even after a resharding operation. See <a href="#appendix-detailed-explanation-of-record-ordering">explanation of record ordering in Kinesis Data Streams</a> for more information.</li>
<li><strong>Migrate away from AWS SDK v1 to AWS SDK v2.</strong> This SDK update aligns with best practices.</li>
<li><strong>Migrate away from custom retry strategies to use the AWS SDK native retry strategies.</strong> This allows us to benefit from AWS error classification in the retry algorithm.</li>
<li><strong>Reduce jar size by &gt;99%, from <a href="https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kinesis/5.0.0-1.20">~60MB</a> to <a href="https://mvnrepository.com/artifact/org.apache.flink/flink-connector-aws-kinesis-streams/5.0.0-1.20">~200KB</a>.</strong> In the new source, we no longer shade the AWS SDK and no longer need to package the Kinesis Producer Library (needed for legacy sink). This will lead to smaller Flink application jars. Note that users will still need to shade the AWS SDK into the final Flink application jar. However, only one copy of the AWS SDK will be needed.</li>
<li><strong>Improve defaults.</strong> The <code>UniformShardAssigner</code> is now the default shard assigner. This change results in a uniform shard distribution across Flink subtasks and can reduce unexpected processing skew.</li>
</ol>
<h2 id="breaking-changes">
Breaking changes
<a class="anchor" href="#breaking-changes">#</a>
</h2>
<p>During the implementation of the source Table API, we had to introduce some breaking changes around the table identifier of <code>kinesis</code>. This necessitated a major version bump from <code>4.x</code> to <code>5.x</code>. In Table API / SQL, for version <code>4.x</code> and below, <code>kinesis</code> refers to the old <code>FlinkKinesisConsumer</code>. However, from <code>5.x</code> onwards, <code>kinesis</code> now refers to the new <code>KinesisStreamSource</code>. To use the old <code>FlinkKinesisConsumer</code> with <code>5.x</code>, you can use the table identifier of <code>kinesis-legacy</code>. See <a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/kinesis/"><code>Kinesis Table API documentation</code></a> for more details.</p>
<h2 id="migration-guidance">
Migration guidance
<a class="anchor" href="#migration-guidance">#</a>
</h2>
<p>There is no state compatibility between the legacy sources (<code>FlinkKinesisConsumer</code> and <code>FlinkDynamoDBStreamsConsumer</code>), and the new sources (<code>KinesisStreamsSource</code> and <code>DynamoDbStreamsSource</code>). This means that in order to migrate from the legacy source to the new source, users must drop the state of the source operator and start from a specified starting position, to prevent any data loss.</p>
<h3 id="example-migrating-flinkkinesisconsumer-to-kinesisstreamssource">
Example migrating <code>FlinkKinesisConsumer</code> to <code>KinesisStreamsSource</code>
<a class="anchor" href="#example-migrating-flinkkinesisconsumer-to-kinesisstreamssource">#</a>
</h3>
<p>Here we show a simple example to migrate from <code>FlinkKinesisConsumer</code> to <code>KinesisStreamsSource</code>. See <a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/">Flink Kinesis connector documentation</a> for more details.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="n">StreamExecutionEnvironment</span><span class="w"> </span><span class="n">env</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">StreamExecutionEnvironment</span><span class="p">.</span><span class="na">getExecutionEnvironment</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">// Old FlinkKinesisConsumer to read from stream test-stream from TRIM_HORIZON</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">Properties</span><span class="w"> </span><span class="n">consumerConfig</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">Properties</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">consumerConfig</span><span class="p">.</span><span class="na">put</span><span class="p">(</span><span class="n">AWSConfigConstants</span><span class="p">.</span><span class="na">AWS_REGION</span><span class="p">,</span><span class="w"> </span><span class="s">&#34;us-east-1&#34;</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">consumerConfig</span><span class="p">.</span><span class="na">put</span><span class="p">(</span><span class="n">ConsumerConfigConstants</span><span class="p">.</span><span class="na">STREAM_INITIAL_POSITION</span><span class="p">,</span><span class="w"> </span><span class="s">&#34;TRIM_HORIZON&#34;</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">FlinkKinesisConsumer</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span><span class="w"> </span><span class="n">oldKinesisConsumer</span><span class="w"> </span><span class="o">=</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">FlinkKinesisConsumer</span><span class="o">&lt;&gt;</span><span class="p">(</span><span class="s">&#34;test-stream&#34;</span><span class="p">,</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">SimpleStringSchema</span><span class="p">(),</span><span class="w"> </span><span class="n">consumerConfig</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">DataStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span><span class="w"> </span><span class="n">kinesisRecordsFromOldKinesisConsumer</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">env</span><span class="p">.</span><span class="na">addSource</span><span class="p">(</span><span class="n">oldKinesisConsumer</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">uid</span><span class="p">(</span><span class="s">&#34;custom-uid&#34;</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">assignTimestampsAndWatermarks</span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">WatermarkStrategy</span><span class="p">.</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span><span class="n">forMonotonousTimestamps</span><span class="p">().</span><span class="na">withIdleness</span><span class="p">(</span><span class="n">Duration</span><span class="p">.</span><span class="na">ofSeconds</span><span class="p">(</span><span class="n">1</span><span class="p">)))</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">// New KinesisStreamsSource to read from stream test-stream from TRIM_HORIZON</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">Configuration</span><span class="w"> </span><span class="n">sourceConfig</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">Configuration</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">sourceConfig</span><span class="p">.</span><span class="na">set</span><span class="p">(</span><span class="n">KinesisSourceConfigOptions</span><span class="p">.</span><span class="na">STREAM_INITIAL_POSITION</span><span class="p">,</span><span class="w"> </span><span class="n">KinesisSourceConfigOptions</span><span class="p">.</span><span class="na">InitialPosition</span><span class="p">.</span><span class="na">TRIM_HORIZON</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">KinesisStreamsSource</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span><span class="w"> </span><span class="n">newKdsSource</span><span class="w"> </span><span class="o">=</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">KinesisStreamsSource</span><span class="p">.</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span><span class="n">builder</span><span class="p">()</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">setStreamArn</span><span class="p">(</span><span class="s">&#34;arn:aws:kinesis:us-east-1:123456789012:stream/test-stream&#34;</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">setSourceConfig</span><span class="p">(</span><span class="n">sourceConfig</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">setDeserializationSchema</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">SimpleStringSchema</span><span class="p">())</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">build</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">DataStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span><span class="w"> </span><span class="n">kinesisRecordsWithEventTimeWatermarks</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">env</span><span class="p">.</span><span class="na">fromSource</span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">kdsSource</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">WatermarkStrategy</span><span class="p">.</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span><span class="n">forMonotonousTimestamps</span><span class="p">().</span><span class="na">withIdleness</span><span class="p">(</span><span class="n">Duration</span><span class="p">.</span><span class="na">ofSeconds</span><span class="p">(</span><span class="n">1</span><span class="p">)),</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s">&#34;Kinesis source&#34;</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">returns</span><span class="p">(</span><span class="n">TypeInformation</span><span class="p">.</span><span class="na">of</span><span class="p">(</span><span class="n">String</span><span class="p">.</span><span class="na">class</span><span class="p">))</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">uid</span><span class="p">(</span><span class="s">&#34;custom-uid&#34;</span><span class="p">);</span><span class="w">
</span></span></span></code></pre></div><h3 id="example-migrating-flinkdynamodbstreamsconsumer-to-dynamodbstreamssource">
Example migrating <code>FlinkDynamoDBStreamsConsumer</code> to <code>DynamoDbStreamsSource</code>
<a class="anchor" href="#example-migrating-flinkdynamodbstreamsconsumer-to-dynamodbstreamssource">#</a>
</h3>
<p>Here we show a simple example to migrate from <code>FlinkDynamoDBStreamsConsumer</code> to <code>DynamoDbStreamsSource</code>. See <a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/">Flink DynamoDB connector documentation</a> for more details.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="n">StreamExecutionEnvironment</span><span class="w"> </span><span class="n">env</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">StreamExecutionEnvironment</span><span class="p">.</span><span class="na">getExecutionEnvironment</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">// Old FlinkDynamoDBStreamsConsumer to read from stream test stream from TRIM_HORIZON</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">Properties</span><span class="w"> </span><span class="n">consumerConfig</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">Properties</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">consumerConfig</span><span class="p">.</span><span class="na">put</span><span class="p">(</span><span class="n">AWSConfigConstants</span><span class="p">.</span><span class="na">AWS_REGION</span><span class="p">,</span><span class="w"> </span><span class="s">&#34;us-east-1&#34;</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">consumerConfig</span><span class="p">.</span><span class="na">put</span><span class="p">(</span><span class="n">ConsumerConfigConstants</span><span class="p">.</span><span class="na">STREAM_INITIAL_POSITION</span><span class="p">,</span><span class="w"> </span><span class="s">&#34;TRIM_HORIZON&#34;</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">FlinkDynamoDBStreamsConsumer</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span><span class="w"> </span><span class="n">oldDynamodbStreamsConsumer</span><span class="w"> </span><span class="o">=</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">FlinkDynamoDBStreamsConsumer</span><span class="o">&lt;&gt;</span><span class="p">(</span><span class="s">&#34;arn:aws:dynamodb:us-east-1:1231231230:table/test/stream/2024-04-11T07:14:19.380&#34;</span><span class="p">,</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">SimpleStringSchema</span><span class="p">(),</span><span class="w"> </span><span class="n">consumerConfig</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">DataStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span><span class="w"> </span><span class="n">dynamodbRecordsFromOldDynamodbStreamsConsumer</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">env</span><span class="p">.</span><span class="na">addSource</span><span class="p">(</span><span class="n">oldDynamodbStreamsConsumer</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">uid</span><span class="p">(</span><span class="s">&#34;custom-uid&#34;</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">assignTimestampsAndWatermarks</span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">WatermarkStrategy</span><span class="p">.</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span><span class="n">forMonotonousTimestamps</span><span class="p">().</span><span class="na">withIdleness</span><span class="p">(</span><span class="n">Duration</span><span class="p">.</span><span class="na">ofSeconds</span><span class="p">(</span><span class="n">1</span><span class="p">)))</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">// New DynamoDbStreamsSource to read from stream test stream from TRIM_HORIZON</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">Configuration</span><span class="w"> </span><span class="n">sourceConfig</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">Configuration</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">sourceConfig</span><span class="p">.</span><span class="na">set</span><span class="p">(</span><span class="n">DynamodbStreamsSourceConfigConstants</span><span class="p">.</span><span class="na">STREAM_INITIAL_POSITION</span><span class="p">,</span><span class="w"> </span><span class="n">DynamodbStreamsSourceConfigConstants</span><span class="p">.</span><span class="na">InitialPosition</span><span class="p">.</span><span class="na">TRIM_HORIZON</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">KinesisStreamsSource</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span><span class="w"> </span><span class="n">newDynamoDbStreamsSource</span><span class="w"> </span><span class="o">=</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">DynamoDbStreamsSource</span><span class="p">.</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span><span class="n">builder</span><span class="p">()</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">setStreamArn</span><span class="p">(</span><span class="s">&#34;arn:aws:dynamodb:us-east-1:1231231230:table/test/stream/2024-04-11T07:14:19.380&#34;</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">setSourceConfig</span><span class="p">(</span><span class="n">sourceConfig</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// User must implement their own deserialization schema to translate change data capture events into custom data types </span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">setDeserializationSchema</span><span class="p">(</span><span class="n">dynamodbDeserializationSchema</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">build</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">DataStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span><span class="w"> </span><span class="n">dynamodbRecordsWithEventTimeWatermarks</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">env</span><span class="p">.</span><span class="na">fromSource</span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">newDynamoDbStreamsSource</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">WatermarkStrategy</span><span class="p">.</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span><span class="n">forMonotonousTimestamps</span><span class="p">().</span><span class="na">withIdleness</span><span class="p">(</span><span class="n">Duration</span><span class="p">.</span><span class="na">ofSeconds</span><span class="p">(</span><span class="n">1</span><span class="p">)),</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s">&#34;DynamoDB Streams source&#34;</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">returns</span><span class="p">(</span><span class="n">TypeInformation</span><span class="p">.</span><span class="na">of</span><span class="p">(</span><span class="n">String</span><span class="p">.</span><span class="na">class</span><span class="p">))</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">uid</span><span class="p">(</span><span class="s">&#34;custom-uid&#34;</span><span class="p">);</span><span class="w">
</span></span></span></code></pre></div><h2 id="summary">
Summary
<a class="anchor" href="#summary">#</a>
</h2>
<p>In this blog, we have covered the motivation behind creating the new <code>KinesisStreamsSource</code> and <code>DynamoDbStreamsSource</code> connectors, highlighting their new features and migration guidance. Feel free to reach out on the Flink mailing list (<a href="mailto:dev@flink.apache.org">dev@flink.apache.org</a>) or Flink Slack to discuss any further improvements.</p>
<p>To get started with the connectors, follow one of the guides below!</p>
<ul>
<li><a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/#kinesis-streams-source"><strong>Amazon Kinesis Data Streams Source</strong></a></li>
<li><a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/#amazon-dynamodb-streams-source"><strong>Amazon DynamoDB Streams Source</strong></a></li>
</ul>
<h2 id="further-work">
Further work
<a class="anchor" href="#further-work">#</a>
</h2>
<p>Support for the below additional features are also in progress:</p>
<ul>
<li><a href="https://issues.apache.org/jira/browse/FLINK-34340">Support for Table API and SQL for the DynamoDB Streams Source.</a></li>
<li><a href="https://issues.apache.org/jira/browse/FLINK-31988">Datastream Python integration</a> for both sources are not implemented yet, but we recognize their importance to users.</li>
</ul>
<h2 id="list-of-contributors">
List of Contributors
<a class="anchor" href="#list-of-contributors">#</a>
</h2>
<p>Abhi Gupta, Aleksandr Pilipenko, Burak Ozakinci, Elphas Toringepi, Lorenzo Nicora, Danny Cranmer, Hong Teoh</p>
<h2 id="appendix-detailed-explanation-of-record-ordering">
Appendix: Detailed explanation of record ordering
<a class="anchor" href="#appendix-detailed-explanation-of-record-ordering">#</a>
</h2>
<p>As a scalable streaming data store, the data records belonging to a Kinesis Data Stream are segregated across multiple shards, based on the partition key specified when writing the data record. Record ordering is maintained only within the same shard. Since records with the same partition key are written to the same shard, record ordering is maintained for a given partition key.</p>
<p>The situation gets complicated when the stream is resharded to scale the stream read/write capacity. During the resharding of a stream, shards go through split and merge operations. A split operation splits one parent shard into two smaller child shards. A merge operation merges two parent shards into one larger child shard.</p>
<p>An example of resharding a stream from 2 open shards to 3 open shards is shown below.</p>
<center>
<br/>
<img src="/img/blog/2024-11-25-whats-new-aws-connectors/kinesis_resharding.png" width="60%"/>
<br/>
Fig. 1 - Illustration of uniform resharding event in Kinesis Data Stream from 2 shards to 3 shards.
</center>
<p>In the diagram, the stream goes from having 2 open shards (0, 1) to having 3 open shards (2, 5, 6) and 4 closed shards (0, 1, 3, 4). Closed shards can contain records, but will no longer receive any new records, whereas open shards can still receive new records. The reason for multiple split/merge operations during this resharding is to prevent record skew across shards.</p>
<p>The diagram below illustrates what could happen when records with a given ordering within the same partition key are written to the stream.</p>
<center>
<br/>
<img src="/img/blog/2024-11-25-whats-new-aws-connectors/kinesis_records_sharding.png" width="60%"/>
<br/>
Fig. 2 - Illustration of record distribution within a Kinesis Data Stream after a resharding operation.
</center>
<p>As we can see, to ensure that records from the <code>pk2</code> are read in order, we need to ensure that the shards are read in order of <code>Shard 0</code>, then <code>Shard 3</code>, then <code>Shard 6</code>. This can be more easily understood as: All parent shards must be fully read before children shards can be read.</p>
<p>The new <code>KinesisStreamsSource</code> ensures that parent shards are read completely before reading children shards, and so ensures that record ordering is maintained even after a resharding operation on the stream.</p>
</p>
</article>
<div class="edit-this-page">
<p>
<a href="https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications">Want to contribute translation?</a>
</p>
<p>
<a href="//github.com/apache/flink-web/edit/asf-site/docs/content/posts/2024-11-25-whats-new-aws-connectors-5.0.0.md">
Edit This Page<i class="fa fa-edit fa-fw"></i>
</a>
</p>
</div>
</section>
<aside class="book-toc">
<nav id="TableOfContents"><h3>On This Page <a href="javascript:void(0)" class="toc" onclick="collapseToc()"><i class="fa fa-times" aria-hidden="true"></i></a></h3>
<ul>
<li>
<ul>
<li><a href="#dependencies">Dependencies</a></li>
<li><a href="#why-did-we-need-new-source-connectors">Why did we need new source connectors?</a></li>
<li><a href="#new-features">New features</a></li>
<li><a href="#breaking-changes">Breaking changes</a></li>
<li><a href="#migration-guidance">Migration guidance</a>
<ul>
<li><a href="#example-migrating-flinkkinesisconsumer-to-kinesisstreamssource">Example migrating <code>FlinkKinesisConsumer</code> to <code>KinesisStreamsSource</code></a></li>
<li><a href="#example-migrating-flinkdynamodbstreamsconsumer-to-dynamodbstreamssource">Example migrating <code>FlinkDynamoDBStreamsConsumer</code> to <code>DynamoDbStreamsSource</code></a></li>
</ul>
</li>
<li><a href="#summary">Summary</a></li>
<li><a href="#further-work">Further work</a></li>
<li><a href="#list-of-contributors">List of Contributors</a></li>
<li><a href="#appendix-detailed-explanation-of-record-ordering">Appendix: Detailed explanation of record ordering</a></li>
</ul>
</li>
</ul>
</nav>
</aside>
<aside class="expand-toc hidden">
<a class="toc" onclick="expandToc()" href="javascript:void(0)">
<i class="fa fa-bars" aria-hidden="true"></i>
</a>
</aside>
</main>
<footer>
<div class="separator"></div>
<div class="panels">
<div class="wrapper">
<div class="panel">
<ul>
<li>
<a href="https://flink-packages.org/">flink-packages.org</a>
</li>
<li>
<a href="https://www.apache.org/">Apache Software Foundation</a>
</li>
<li>
<a href="https://www.apache.org/licenses/">License</a>
</li>
<li>
<a href="/zh/">
<i class="fa fa-globe" aria-hidden="true"></i>&nbsp;中文版
</a>
</li>
</ul>
</div>
<div class="panel">
<ul>
<li>
<a href="/what-is-flink/security">Security</a-->
</li>
<li>
<a href="https://www.apache.org/foundation/sponsorship.html">Donate</a>
</li>
<li>
<a href="https://www.apache.org/foundation/thanks.html">Thanks</a>
</li>
</ul>
</div>
<div class="panel icons">
<div>
<a href="/posts">
<div class="icon flink-blog-icon"></div>
<span>Flink blog</span>
</a>
</div>
<div>
<a href="https://github.com/apache/flink">
<div class="icon flink-github-icon"></div>
<span>Github</span>
</a>
</div>
<div>
<a href="https://twitter.com/apacheflink">
<div class="icon flink-twitter-icon"></div>
<span>Twitter</span>
</a>
</div>
</div>
</div>
</div>
<hr/>
<div class="container disclaimer">
<p>The contents of this website are © 2024 Apache Software Foundation under the terms of the Apache License v2. Apache Flink, Flink, and the Flink logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries.</p>
</div>
</footer>
</body>
</html>