<!DOCTYPE html><html lang="en"><head><meta charSet="utf-8"/><meta http-equiv="X-UA-Compatible" content="IE=edge"/><title>Kinesis sink connector · Apache Pulsar</title><meta name="viewport" content="width=device-width, initial-scale=1.0"/><meta name="generator" content="Docusaurus"/><meta name="description" content="The Kinesis sink connector pulls data from Pulsar and persists data into Amazon Kinesis."/><meta name="docsearch:version" content="next"/><meta name="docsearch:language" content="en"/><meta property="og:title" content="Kinesis sink connector · Apache Pulsar"/><meta property="og:type" content="website"/><meta property="og:url" content="https://pulsar.apache.org/"/><meta property="og:description" content="The Kinesis sink connector pulls data from Pulsar and persists data into Amazon Kinesis."/><meta name="twitter:card" content="summary"/><meta name="twitter:image" content="https://pulsar.apache.org/img/pulsar.svg"/><link rel="shortcut icon" href="/img/pulsar.ico"/><link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/atom-one-dark.min.css"/><link rel="alternate" type="application/atom+xml" href="https://pulsar.apache.org/blog/atom.xml" title="Apache Pulsar Blog ATOM Feed"/><link rel="alternate" type="application/rss+xml" href="https://pulsar.apache.org/blog/feed.xml" title="Apache Pulsar Blog RSS Feed"/><link rel="stylesheet" href="/css/code-blocks-buttons.css"/><script type="text/javascript" src="https://buttons.github.io/buttons.js"></script><script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/clipboard.js/2.0.0/clipboard.min.js"></script><script type="text/javascript" src="/js/custom.js"></script><script src="/js/scrollSpy.js"></script><link rel="stylesheet" href="/css/main.css"/><script src="/js/codetabs.js"></script></head><body class="sideNavVisible separateOnPageNav"><div class="fixedHeaderContainer"><div class="headerWrapper wrapper"><header><a href="/en"><img class="logo" src="/img/pulsar.svg" alt="Apache Pulsar"/></a><a href="/en/versions"><h3>next</h3></a><div class="navigationWrapper navigationSlider"><nav class="slidingNav"><ul class="nav-site nav-site-internal"><li class=""><a href="/docs/en/next/getting-started-standalone" target="_self">Docs</a></li><li class=""><a href="/en/download" target="_self">Download</a></li><li class=""><a href="/docs/en/next/client-libraries" target="_self">Clients</a></li><li class=""><a href="#restapis" target="_self">REST APIs</a></li><li class=""><a href="#cli" target="_self">Cli</a></li><li class=""><a href="/blog/" target="_self">Blog</a></li><li class=""><a href="#community" target="_self">Community</a></li><li class=""><a href="#apache" target="_self">Apache</a></li><li class=""><a href="https://pulsar-next.staged.apache.org/" target="_self">New Website (Beta)</a></li><span><li><a id="languages-menu" href="#"><img class="languages-icon" src="/img/language.svg" alt="Languages icon"/>English</a><div id="languages-dropdown" class="hide"><ul id="languages-dropdown-items"><li><a href="/docs/ja/next/io-kinesis-sink">日本語</a></li><li><a href="/docs/fr/next/io-kinesis-sink">Français</a></li><li><a href="/docs/ko/next/io-kinesis-sink">한국어</a></li><li><a href="/docs/zh-CN/next/io-kinesis-sink">中文</a></li><li><a href="/docs/zh-TW/next/io-kinesis-sink">繁體中文</a></li><li><a href="https://crowdin.com/project/apache-pulsar" target="_blank" rel="noreferrer noopener">Help Translate</a></li></ul></div></li><script>
        const languagesMenuItem = document.getElementById("languages-menu");
        const languagesDropDown = document.getElementById("languages-dropdown");
        languagesMenuItem.addEventListener("click", function(event) {
          event.preventDefault();

          if (languagesDropDown.className == "hide") {
            languagesDropDown.className = "visible";
          } else {
            languagesDropDown.className = "hide";
          }
        });
      </script></span></ul></nav></div></header></div></div><div class="navPusher"><div class="docMainWrapper wrapper"><div class="container mainContainer docsContainer"><div class="wrapper"><div class="post"><header class="postHeader"><a class="edit-page-link button" href="https://github.com/apache/pulsar/edit/master/site2/docs/io-kinesis-sink.md" target="_blank" rel="noreferrer noopener">Edit</a><h1 id="__docusaurus" class="postHeaderTitle">Kinesis sink connector</h1></header><article><div><span><p>The Kinesis sink connector pulls data from Pulsar and persists data into Amazon Kinesis.</p>
<h2><a class="anchor" aria-hidden="true" id="configuration"></a><a href="#configuration" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Configuration</h2>
<p>The configuration of the Kinesis sink connector has the following property.</p>
<h3><a class="anchor" aria-hidden="true" id="property"></a><a href="#property" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Property</h3>
<table>
<thead>
<tr><th>Name</th><th>Type</th><th>Required</th><th>Default</th><th>Description</th></tr>
</thead>
<tbody>
<tr><td><code>messageFormat</code></td><td>MessageFormat</td><td>true</td><td>ONLY_RAW_PAYLOAD</td><td>Message format in which Kinesis sink converts Pulsar messages and publishes to Kinesis streams.<br/><br/>Below are the available options:<br/><br/><li><code>ONLY_RAW_PAYLOAD</code>: Kinesis sink directly publishes Pulsar message payload as a message into the configured Kinesis stream. <br/><br/><li><code>FULL_MESSAGE_IN_JSON</code>: Kinesis sink creates a JSON payload with Pulsar message payload, properties and encryptionCtx, and publishes JSON payload into the configured Kinesis stream.<br/><br/><li><code>FULL_MESSAGE_IN_FB</code>: Kinesis sink creates a flatbuffer serialized payload with Pulsar message payload, properties and encryptionCtx, and publishes flatbuffer payload into the configured Kinesis stream.<br/><br/><li><code>FULL_MESSAGE_IN_JSON_EXPAND_VALUE</code>: Kinesis sink sends a JSON structure containing the record topic name, key, payload, properties and event time. The record schema is used to convert the value to JSON.</td></tr>
<tr><td><code>jsonIncludeNonNulls</code></td><td>boolean</td><td>false</td><td>true</td><td>Only the properties with non-null values are included when the message format is <code>FULL_MESSAGE_IN_JSON_EXPAND_VALUE</code>.</td></tr>
<tr><td><code>jsonFlatten</code></td><td>boolean</td><td>false</td><td>false</td><td>When it is set to <code>true</code> and the message format is <code>FULL_MESSAGE_IN_JSON_EXPAND_VALUE</code>, the output JSON is flattened.</td></tr>
<tr><td><code>retainOrdering</code></td><td>boolean</td><td>false</td><td>false</td><td>Whether Pulsar connectors to retain ordering when moving messages from Pulsar to Kinesis or not.</td></tr>
<tr><td><code>awsEndpoint</code></td><td>String</td><td>false</td><td>&quot; &quot; (empty string)</td><td>The Kinesis end-point URL, which can be found at <a href="https://docs.aws.amazon.com/general/latest/gr/rande.html">here</a>.</td></tr>
<tr><td><code>awsRegion</code></td><td>String</td><td>false</td><td>&quot; &quot; (empty string)</td><td>The AWS region. <br/><br/><strong>Example</strong><br/> us-west-1, us-west-2</td></tr>
<tr><td><code>awsKinesisStreamName</code></td><td>String</td><td>true</td><td>&quot; &quot; (empty string)</td><td>The Kinesis stream name.</td></tr>
<tr><td><code>awsCredentialPluginName</code></td><td>String</td><td>false</td><td>&quot; &quot; (empty string)</td><td>The fully-qualified class name of implementation of <a href="https://github.com/apache/pulsar/tree/master//pulsar-io/aws/src/main/java/org/apache/pulsar/io/aws/AwsCredentialProviderPlugin.java"><code>AwsCredentialProviderPlugin</code></a>
. <br/><br/>It is a factory class which creates an AWSCredentialsProvider that is used by Kinesis sink. <br/><br/>If it is empty, the Kinesis sink creates a default AWSCredentialsProvider which accepts json-map of credentials in <code>awsCredentialPluginParam</code>.</td></tr>
<tr><td><code>awsCredentialPluginParam</code></td><td>String</td><td>false</td><td>&quot; &quot; (empty string)</td><td>The JSON parameter to initialize <code>awsCredentialsProviderPlugin</code>.</td></tr>
</tbody>
</table>
<h3><a class="anchor" aria-hidden="true" id="built-in-plugins"></a><a href="#built-in-plugins" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Built-in plugins</h3>
<p>The following are built-in <code>AwsCredentialProviderPlugin</code> plugins:</p>
<ul>
<li><p><code>org.apache.pulsar.io.aws.AwsDefaultProviderChainPlugin</code></p>
<p>This plugin takes no configuration, it uses the default AWS provider chain.</p>
<p>For more information, see <a href="https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-default">AWS documentation</a>.</p></li>
<li><p><code>org.apache.pulsar.io.aws.STSAssumeRoleProviderPlugin</code></p>
<p>This plugin takes a configuration (via the <code>awsCredentialPluginParam</code>) that describes a role to assume when running the KCL.</p>
<p>This configuration takes the form of a small json document like:</p>
<pre><code class="hljs css language-json">{<span class="hljs-attr">"roleArn"</span>: <span class="hljs-string">"arn..."</span>, <span class="hljs-attr">"roleSessionName"</span>: <span class="hljs-string">"name"</span>}
</code></pre></li>
</ul>
<h3><a class="anchor" aria-hidden="true" id="example"></a><a href="#example" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Example</h3>
<p>Before using the Kinesis sink connector, you need to create a configuration file through one of the following methods.</p>
<ul>
<li><p>JSON</p>
<pre><code class="hljs css language-json">{
   <span class="hljs-attr">"configs"</span>: {
      <span class="hljs-attr">"awsEndpoint"</span>: <span class="hljs-string">"some.endpoint.aws"</span>,
      <span class="hljs-attr">"awsRegion"</span>: <span class="hljs-string">"us-east-1"</span>,
      <span class="hljs-attr">"awsKinesisStreamName"</span>: <span class="hljs-string">"my-stream"</span>,
      <span class="hljs-attr">"awsCredentialPluginParam"</span>: <span class="hljs-string">"{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}"</span>,
      <span class="hljs-attr">"messageFormat"</span>: <span class="hljs-string">"ONLY_RAW_PAYLOAD"</span>,
      <span class="hljs-attr">"retainOrdering"</span>: <span class="hljs-string">"true"</span>
   }
}
</code></pre></li>
<li><p>YAML</p>
<pre><code class="hljs css language-yaml"><span class="hljs-attr">configs:</span>
    <span class="hljs-attr">awsEndpoint:</span> <span class="hljs-string">"some.endpoint.aws"</span>
    <span class="hljs-attr">awsRegion:</span> <span class="hljs-string">"us-east-1"</span>
    <span class="hljs-attr">awsKinesisStreamName:</span> <span class="hljs-string">"my-stream"</span>
    <span class="hljs-attr">awsCredentialPluginParam:</span> <span class="hljs-string">"{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}"</span>
    <span class="hljs-attr">messageFormat:</span> <span class="hljs-string">"ONLY_RAW_PAYLOAD"</span>
    <span class="hljs-attr">retainOrdering:</span> <span class="hljs-string">"true"</span>
</code></pre></li>
</ul>
</span></div></article></div><div class="docs-prevnext"></div></div></div><nav class="onPageNav"><ul class="toc-headings"><li><a href="#configuration">Configuration</a><ul class="toc-headings"><li><a href="#property">Property</a></li><li><a href="#built-in-plugins">Built-in plugins</a></li><li><a href="#example">Example</a></li></ul></li></ul></nav></div><footer class="nav-footer" id="footer"><section class="copyright">Copyright © 2022 The Apache Software Foundation. All Rights Reserved. Apache, Apache Pulsar and the Apache feather logo are trademarks of The Apache Software Foundation.</section><span><script>
      const community = document.querySelector("a[href='#community']").parentNode;
      const communityMenu =
        '<li>' +
        '<a id="community-menu" href="#">Community <span style="font-size: 0.75em">&nbsp;▼</span></a>' +
        '<div id="community-dropdown" class="hide">' +
          '<ul id="community-dropdown-items">' +
            '<li><a href="/en/contact">Contact</a></li>' +
            '<li><a href="/en/contributing">Contributing</a></li>' +
            '<li><a href="/en/coding-guide">Coding guide</a></li>' +
            '<li><a href="/en/events">Events</a></li>' +
            '<li><a href="https://twitter.com/Apache_Pulsar" target="_blank">Twitter &#x2750</a></li>' +
            '<li><a href="https://github.com/apache/pulsar/wiki" target="_blank">Wiki &#x2750</a></li>' +
            '<li><a href="https://github.com/apache/pulsar/issues" target="_blank">Issue tracking &#x2750</a></li>' +
            '<li><a href="https://pulsar-summit.org/" target="_blank">Pulsar Summit &#x2750</a></li>' +
            '<li>&nbsp;</li>' +
            '<li><a href="/en/resources">Resources</a></li>' +
            '<li><a href="/en/team">Team</a></li>' +
            '<li><a href="/en/powered-by">Powered By</a></li>' +
          '</ul>' +
        '</div>' +
        '</li>';

      community.innerHTML = communityMenu;

      const communityMenuItem = document.getElementById("community-menu");
      const communityDropDown = document.getElementById("community-dropdown");
      communityMenuItem.addEventListener("click", function(event) {
        event.preventDefault();

        if (communityDropDown.className == 'hide') {
          communityDropDown.className = 'visible';
        } else {
          communityDropDown.className = 'hide';
        }
      });
    </script></span></footer></div><script>window.twttr=(function(d,s, id){var js,fjs=d.getElementsByTagName(s)[0],t=window.twttr||{};if(d.getElementById(id))return t;js=d.createElement(s);js.id=id;js.src='https://platform.twitter.com/widgets.js';fjs.parentNode.insertBefore(js, fjs);t._e = [];t.ready = function(f) {t._e.push(f);};return t;}(document, 'script', 'twitter-wjs'));</script></body></html>