<!DOCTYPE html><html lang="en"><head><meta charSet="utf-8"/><meta http-equiv="X-UA-Compatible" content="IE=edge"/><title>Kinesis source connector · Apache Pulsar</title><meta name="viewport" content="width=device-width, initial-scale=1.0"/><meta name="generator" content="Docusaurus"/><meta name="description" content="The Kinesis source connector pulls data from Amazon Kinesis and persists data into Pulsar."/><meta name="docsearch:version" content="2.10.0"/><meta name="docsearch:language" content="en"/><meta property="og:title" content="Kinesis source connector · Apache Pulsar"/><meta property="og:type" content="website"/><meta property="og:url" content="https://pulsar.apache.org/"/><meta property="og:description" content="The Kinesis source connector pulls data from Amazon Kinesis and persists data into Pulsar."/><meta name="twitter:card" content="summary"/><meta name="twitter:image" content="https://pulsar.apache.org/img/pulsar.svg"/><link rel="shortcut icon" href="/img/pulsar.ico"/><link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/atom-one-dark.min.css"/><link rel="alternate" type="application/atom+xml" href="https://pulsar.apache.org/blog/atom.xml" title="Apache Pulsar Blog ATOM Feed"/><link rel="alternate" type="application/rss+xml" href="https://pulsar.apache.org/blog/feed.xml" title="Apache Pulsar Blog RSS Feed"/><link rel="stylesheet" href="/css/code-blocks-buttons.css"/><script type="text/javascript" src="https://buttons.github.io/buttons.js"></script><script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/clipboard.js/2.0.0/clipboard.min.js"></script><script type="text/javascript" src="/js/custom.js"></script><script src="/js/scrollSpy.js"></script><link rel="stylesheet" href="/css/main.css"/><script src="/js/codetabs.js"></script></head><body class="sideNavVisible separateOnPageNav"><div class="fixedHeaderContainer"><div class="headerWrapper wrapper"><header><a href="/en"><img class="logo" src="/img/pulsar.svg" alt="Apache Pulsar"/></a><a href="/en/versions"><h3>2.10.0</h3></a><div class="navigationWrapper navigationSlider"><nav class="slidingNav"><ul class="nav-site nav-site-internal"><li class=""><a href="/docs/en/getting-started-standalone" target="_self">Docs</a></li><li class=""><a href="/en/download" target="_self">Download</a></li><li class=""><a href="/docs/en/client-libraries" target="_self">Clients</a></li><li class=""><a href="#restapis" target="_self">REST APIs</a></li><li class=""><a href="#cli" target="_self">Cli</a></li><li class=""><a href="/blog/" target="_self">Blog</a></li><li class=""><a href="#community" target="_self">Community</a></li><li class=""><a href="#apache" target="_self">Apache</a></li><li class=""><a href="https://pulsar-next.staged.apache.org/" target="_self">New Website (Beta)</a></li><span><li><a id="languages-menu" href="#"><img class="languages-icon" src="/img/language.svg" alt="Languages icon"/>English</a><div id="languages-dropdown" class="hide"><ul id="languages-dropdown-items"><li><a href="/docs/ja/io-kinesis-source">日本語</a></li><li><a href="/docs/fr/io-kinesis-source">Français</a></li><li><a href="/docs/ko/io-kinesis-source">한국어</a></li><li><a href="/docs/zh-CN/io-kinesis-source">中文</a></li><li><a href="/docs/zh-TW/io-kinesis-source">繁體中文</a></li><li><a href="https://crowdin.com/project/apache-pulsar" target="_blank" rel="noreferrer noopener">Help Translate</a></li></ul></div></li><script>
        const languagesMenuItem = document.getElementById("languages-menu");
        const languagesDropDown = document.getElementById("languages-dropdown");
        languagesMenuItem.addEventListener("click", function(event) {
          event.preventDefault();

          if (languagesDropDown.className == "hide") {
            languagesDropDown.className = "visible";
          } else {
            languagesDropDown.className = "hide";
          }
        });
      </script></span></ul></nav></div></header></div></div><div class="navPusher"><div class="docMainWrapper wrapper"><div class="container mainContainer docsContainer"><div class="wrapper"><div class="post"><header class="postHeader"><a class="edit-page-link button" href="https://github.com/apache/pulsar/edit/master/site2/docs/io-kinesis-source.md" target="_blank" rel="noreferrer noopener">Edit</a><h1 id="__docusaurus" class="postHeaderTitle">Kinesis source connector</h1></header><article><div><span><p>The Kinesis source connector pulls data from Amazon Kinesis and persists data into Pulsar.</p>
<p>This connector uses the <a href="https://github.com/awslabs/amazon-kinesis-client">Kinesis Consumer Library</a> (KCL) to do the actual consuming of messages. The KCL uses DynamoDB to track state for consumers.</p>
<blockquote>
<p>Note: currently, the Kinesis source connector only supports raw messages. If you use KMS encrypted messages, the encrypted messages are sent to downstream. This connector will support decrypting messages in the future release.</p>
</blockquote>
<h2><a class="anchor" aria-hidden="true" id="configuration"></a><a href="#configuration" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Configuration</h2>
<p>The configuration of the Kinesis source connector has the following properties.</p>
<h3><a class="anchor" aria-hidden="true" id="property"></a><a href="#property" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Property</h3>
<table>
<thead>
<tr><th>Name</th><th>Type</th><th>Required</th><th>Default</th><th>Description</th></tr>
</thead>
<tbody>
<tr><td><code>initialPositionInStream</code></td><td>InitialPositionInStream</td><td>false</td><td>LATEST</td><td>The position where the connector starts from.<br/><br/>Below are the available options:<br/><br/><li><code>AT_TIMESTAMP</code>: start from the record at or after the specified timestamp.<br/><br/><li><code>LATEST</code>: start after the most recent data record.<br/><br/><li><code>TRIM_HORIZON</code>: start from the oldest available data record.</td></tr>
<tr><td><code>startAtTime</code></td><td>Date</td><td>false</td><td>&quot; &quot; (empty string)</td><td>If set to <code>AT_TIMESTAMP</code>, it specifies the point in time to start consumption.</td></tr>
<tr><td><code>applicationName</code></td><td>String</td><td>false</td><td>Pulsar IO connector</td><td>The name of the Amazon Kinesis application. <br/><br/>By default, the application name is included in the user agent string used to make AWS requests. This can assist with troubleshooting, for example, distinguish requests made by separate connector instances.</td></tr>
<tr><td><code>checkpointInterval</code></td><td>long</td><td>false</td><td>60000</td><td>The frequency of the Kinesis stream checkpoint in milliseconds.</td></tr>
<tr><td><code>backoffTime</code></td><td>long</td><td>false</td><td>3000</td><td>The amount of time to delay between requests when the connector encounters a throttling exception from AWS Kinesis in milliseconds.</td></tr>
<tr><td><code>numRetries</code></td><td>int</td><td>false</td><td>3</td><td>The number of re-attempts when the connector encounters an exception while trying to set a checkpoint.</td></tr>
<tr><td><code>receiveQueueSize</code></td><td>int</td><td>false</td><td>1000</td><td>The maximum number of AWS records that can be buffered inside the connector. <br/><br/>Once the <code>receiveQueueSize</code> is reached, the connector does not consume any messages from Kinesis until some messages in the queue are successfully consumed.</td></tr>
<tr><td><code>dynamoEndpoint</code></td><td>String</td><td>false</td><td>&quot; &quot; (empty string)</td><td>The Dynamo end-point URL, which can be found at <a href="https://docs.aws.amazon.com/general/latest/gr/rande.html">here</a>.</td></tr>
<tr><td><code>cloudwatchEndpoint</code></td><td>String</td><td>false</td><td>&quot; &quot; (empty string)</td><td>The Cloudwatch end-point URL, which can be found at <a href="https://docs.aws.amazon.com/general/latest/gr/rande.html">here</a>.</td></tr>
<tr><td><code>useEnhancedFanOut</code></td><td>boolean</td><td>false</td><td>true</td><td>If set to true, it uses Kinesis enhanced fan-out.<br><br>If set to false, it uses polling.</td></tr>
<tr><td><code>awsEndpoint</code></td><td>String</td><td>false</td><td>&quot; &quot; (empty string)</td><td>The Kinesis end-point URL, which can be found at <a href="https://docs.aws.amazon.com/general/latest/gr/rande.html">here</a>.</td></tr>
<tr><td><code>awsRegion</code></td><td>String</td><td>false</td><td>&quot; &quot; (empty string)</td><td>The AWS region. <br/><br/><strong>Example</strong><br/> us-west-1, us-west-2</td></tr>
<tr><td><code>awsKinesisStreamName</code></td><td>String</td><td>true</td><td>&quot; &quot; (empty string)</td><td>The Kinesis stream name.</td></tr>
<tr><td><code>awsCredentialPluginName</code></td><td>String</td><td>false</td><td>&quot; &quot; (empty string)</td><td>The fully-qualified class name of implementation of <a href="https://github.com/apache/pulsar/tree/master//pulsar-io/aws/src/main/java/org/apache/pulsar/io/aws/AwsCredentialProviderPlugin.java"><code>AwsCredentialProviderPlugin</code></a>
.<br><br><code>awsCredentialProviderPlugin</code> has the following built-in plugs:<br><br><li><code>org.apache.pulsar.io.kinesis.AwsDefaultProviderChainPlugin</code>:<br> this plugin uses the default AWS provider chain.<br>For more information, see <a href="https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-default">using the default credential provider chain</a>.<br><br><li><code>org.apache.pulsar.io.kinesis.STSAssumeRoleProviderPlugin</code>: <br>this plugin takes a configuration via the <code>awsCredentialPluginParam</code> that describes a role to assume when running the KCL.<br/><strong>JSON configuration example</strong><br/><code>{&quot;roleArn&quot;: &quot;arn...&quot;, &quot;roleSessionName&quot;: &quot;name&quot;}</code> <br/><br/><code>awsCredentialPluginName</code> is a factory class which creates an AWSCredentialsProvider that is used by Kinesis sink. <br/><br/>If <code>awsCredentialPluginName</code> set to empty, the Kinesis sink creates a default AWSCredentialsProvider which accepts json-map of credentials in <code>awsCredentialPluginParam</code>.</td></tr>
<tr><td><code>awsCredentialPluginParam</code></td><td>String</td><td>false</td><td>&quot; &quot; (empty string)</td><td>The JSON parameter to initialize <code>awsCredentialsProviderPlugin</code>.</td></tr>
</tbody>
</table>
<h3><a class="anchor" aria-hidden="true" id="example"></a><a href="#example" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Example</h3>
<p>Before using the Kinesis source connector, you need to create a configuration file through one of the following methods.</p>
<ul>
<li><p>JSON</p>
<pre><code class="hljs css language-json">{
   <span class="hljs-attr">"configs"</span>: {
      <span class="hljs-attr">"awsEndpoint"</span>: <span class="hljs-string">"https://some.endpoint.aws"</span>,
      <span class="hljs-attr">"awsRegion"</span>: <span class="hljs-string">"us-east-1"</span>,
      <span class="hljs-attr">"awsKinesisStreamName"</span>: <span class="hljs-string">"my-stream"</span>,
      <span class="hljs-attr">"awsCredentialPluginParam"</span>: <span class="hljs-string">"{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}"</span>,
      <span class="hljs-attr">"applicationName"</span>: <span class="hljs-string">"My test application"</span>,
      <span class="hljs-attr">"checkpointInterval"</span>: <span class="hljs-string">"30000"</span>,
      <span class="hljs-attr">"backoffTime"</span>: <span class="hljs-string">"4000"</span>,
      <span class="hljs-attr">"numRetries"</span>: <span class="hljs-string">"3"</span>,
      <span class="hljs-attr">"receiveQueueSize"</span>: <span class="hljs-number">2000</span>,
      <span class="hljs-attr">"initialPositionInStream"</span>: <span class="hljs-string">"TRIM_HORIZON"</span>,
      <span class="hljs-attr">"startAtTime"</span>: <span class="hljs-string">"2019-03-05T19:28:58.000Z"</span>
   }
}
</code></pre></li>
<li><p>YAML</p>
<pre><code class="hljs css language-yaml"><span class="hljs-attr">configs:</span>
    <span class="hljs-attr">awsEndpoint:</span> <span class="hljs-string">"https://some.endpoint.aws"</span>
    <span class="hljs-attr">awsRegion:</span> <span class="hljs-string">"us-east-1"</span>
    <span class="hljs-attr">awsKinesisStreamName:</span> <span class="hljs-string">"my-stream"</span>
    <span class="hljs-attr">awsCredentialPluginParam:</span> <span class="hljs-string">"{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}"</span>
    <span class="hljs-attr">applicationName:</span> <span class="hljs-string">"My test application"</span>
    <span class="hljs-attr">checkpointInterval:</span> <span class="hljs-number">30000</span>
    <span class="hljs-attr">backoffTime:</span> <span class="hljs-number">4000</span>
    <span class="hljs-attr">numRetries:</span> <span class="hljs-number">3</span>
    <span class="hljs-attr">receiveQueueSize:</span> <span class="hljs-number">2000</span>
    <span class="hljs-attr">initialPositionInStream:</span> <span class="hljs-string">"TRIM_HORIZON"</span>
    <span class="hljs-attr">startAtTime:</span> <span class="hljs-string">"2019-03-05T19:28:58.000Z"</span>
</code></pre></li>
</ul>
</span></div></article></div><div class="docs-prevnext"></div></div></div><nav class="onPageNav"><ul class="toc-headings"><li><a href="#configuration">Configuration</a><ul class="toc-headings"><li><a href="#property">Property</a></li><li><a href="#example">Example</a></li></ul></li></ul></nav></div><footer class="nav-footer" id="footer"><section class="copyright">Copyright © 2022 The Apache Software Foundation. All Rights Reserved. Apache, Apache Pulsar and the Apache feather logo are trademarks of The Apache Software Foundation.</section><span><script>
      const community = document.querySelector("a[href='#community']").parentNode;
      const communityMenu =
        '<li>' +
        '<a id="community-menu" href="#">Community <span style="font-size: 0.75em">&nbsp;▼</span></a>' +
        '<div id="community-dropdown" class="hide">' +
          '<ul id="community-dropdown-items">' +
            '<li><a href="/en/contact">Contact</a></li>' +
            '<li><a href="/en/contributing">Contributing</a></li>' +
            '<li><a href="/en/coding-guide">Coding guide</a></li>' +
            '<li><a href="/en/events">Events</a></li>' +
            '<li><a href="https://twitter.com/Apache_Pulsar" target="_blank">Twitter &#x2750</a></li>' +
            '<li><a href="https://github.com/apache/pulsar/wiki" target="_blank">Wiki &#x2750</a></li>' +
            '<li><a href="https://github.com/apache/pulsar/issues" target="_blank">Issue tracking &#x2750</a></li>' +
            '<li><a href="https://pulsar-summit.org/" target="_blank">Pulsar Summit &#x2750</a></li>' +
            '<li>&nbsp;</li>' +
            '<li><a href="/en/resources">Resources</a></li>' +
            '<li><a href="/en/team">Team</a></li>' +
            '<li><a href="/en/powered-by">Powered By</a></li>' +
          '</ul>' +
        '</div>' +
        '</li>';

      community.innerHTML = communityMenu;

      const communityMenuItem = document.getElementById("community-menu");
      const communityDropDown = document.getElementById("community-dropdown");
      communityMenuItem.addEventListener("click", function(event) {
        event.preventDefault();

        if (communityDropDown.className == 'hide') {
          communityDropDown.className = 'visible';
        } else {
          communityDropDown.className = 'hide';
        }
      });
    </script></span></footer></div><script>window.twttr=(function(d,s, id){var js,fjs=d.getElementsByTagName(s)[0],t=window.twttr||{};if(d.getElementById(id))return t;js=d.createElement(s);js.id=id;js.src='https://platform.twitter.com/widgets.js';fjs.parentNode.insertBefore(js, fjs);t._e = [];t.ready = function(f) {t._e.push(f);};return t;}(document, 'script', 'twitter-wjs'));</script></body></html>