blob: b94d8b523860df91d47896ae6f9767fb32b92fc6 [file] [log] [blame]
<!DOCTYPE html><html lang="en"><head><meta charSet="utf-8"/><meta http-equiv="X-UA-Compatible" content="IE=edge"/><title>Elasticsearch sink connector · Apache Pulsar</title><meta name="viewport" content="width=device-width, initial-scale=1.0"/><meta name="generator" content="Docusaurus"/><meta name="description" content="The Elasticsearch sink connector pulls messages from Pulsar topics and persists the messages to indexes."/><meta name="docsearch:version" content="2.9.0"/><meta name="docsearch:language" content="en"/><meta property="og:title" content="Elasticsearch sink connector · Apache Pulsar"/><meta property="og:type" content="website"/><meta property="og:url" content="https://pulsar.apache.org/"/><meta property="og:description" content="The Elasticsearch sink connector pulls messages from Pulsar topics and persists the messages to indexes."/><meta name="twitter:card" content="summary"/><meta name="twitter:image" content="https://pulsar.apache.org/img/pulsar.svg"/><link rel="shortcut icon" href="/img/pulsar.ico"/><link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/atom-one-dark.min.css"/><link rel="alternate" type="application/atom+xml" href="https://pulsar.apache.org/blog/atom.xml" title="Apache Pulsar Blog ATOM Feed"/><link rel="alternate" type="application/rss+xml" href="https://pulsar.apache.org/blog/feed.xml" title="Apache Pulsar Blog RSS Feed"/><link rel="stylesheet" href="/css/code-blocks-buttons.css"/><script type="text/javascript" src="https://buttons.github.io/buttons.js"></script><script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/clipboard.js/2.0.0/clipboard.min.js"></script><script type="text/javascript" src="/js/custom.js"></script><script src="/js/scrollSpy.js"></script><link rel="stylesheet" href="/css/main.css"/><script src="/js/codetabs.js"></script></head><body class="sideNavVisible separateOnPageNav"><div class="fixedHeaderContainer"><div class="headerWrapper wrapper"><header><a href="/en"><img class="logo" src="/img/pulsar.svg" alt="Apache Pulsar"/></a><a href="/en/versions"><h3>2.9.0</h3></a><div class="navigationWrapper navigationSlider"><nav class="slidingNav"><ul class="nav-site nav-site-internal"><li class=""><a href="/docs/en/2.9.0/getting-started-standalone" target="_self">Docs</a></li><li class=""><a href="/en/download" target="_self">Download</a></li><li class=""><a href="/docs/en/2.9.0/client-libraries" target="_self">Clients</a></li><li class=""><a href="#restapis" target="_self">REST APIs</a></li><li class=""><a href="#cli" target="_self">Cli</a></li><li class=""><a href="/blog/" target="_self">Blog</a></li><li class=""><a href="#community" target="_self">Community</a></li><li class=""><a href="#apache" target="_self">Apache</a></li><li class=""><a href="https://pulsar-next.staged.apache.org/" target="_self">New Website (Beta)</a></li><span><li><a id="languages-menu" href="#"><img class="languages-icon" src="/img/language.svg" alt="Languages icon"/>English</a><div id="languages-dropdown" class="hide"><ul id="languages-dropdown-items"><li><a href="/docs/ja/2.9.0/io-elasticsearch-sink">日本語</a></li><li><a href="/docs/fr/2.9.0/io-elasticsearch-sink">Français</a></li><li><a href="/docs/ko/2.9.0/io-elasticsearch-sink">한국어</a></li><li><a href="/docs/zh-CN/2.9.0/io-elasticsearch-sink">中文</a></li><li><a href="/docs/zh-TW/2.9.0/io-elasticsearch-sink">繁體中文</a></li><li><a href="https://crowdin.com/project/apache-pulsar" target="_blank" rel="noreferrer noopener">Help Translate</a></li></ul></div></li><script>
const languagesMenuItem = document.getElementById("languages-menu");
const languagesDropDown = document.getElementById("languages-dropdown");
languagesMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (languagesDropDown.className == "hide") {
languagesDropDown.className = "visible";
} else {
languagesDropDown.className = "hide";
}
});
</script></span></ul></nav></div></header></div></div><div class="navPusher"><div class="docMainWrapper wrapper"><div class="container mainContainer docsContainer"><div class="wrapper"><div class="post"><header class="postHeader"><a class="edit-page-link button" href="https://github.com/apache/pulsar/edit/master/site2/docs/io-elasticsearch-sink.md" target="_blank" rel="noreferrer noopener">Edit</a><h1 id="__docusaurus" class="postHeaderTitle">Elasticsearch sink connector</h1></header><article><div><span><p>The Elasticsearch sink connector pulls messages from Pulsar topics and persists the messages to indexes.</p>
<h2><a class="anchor" aria-hidden="true" id="feature"></a><a href="#feature" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Feature</h2>
<h3><a class="anchor" aria-hidden="true" id="handle-data"></a><a href="#handle-data" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Handle data</h3>
<p>Since Pulsar 2.9.0, the Elasticsearch sink connector has the following ways of
working. You can choose one of them.</p>
<table>
<thead>
<tr><th>Name</th><th>Description</th></tr>
</thead>
<tbody>
<tr><td>Raw processing</td><td>The sink reads from topics and passes the raw content to Elasticsearch. <br><br> This is the <strong>default</strong> behavior. <br><br> Raw processing was already available <strong>in Pulsar 2.8.x</strong>.</td></tr>
<tr><td>Schema aware</td><td>The sink uses the schema and handles AVRO, JSON, and KeyValue schema types while mapping the content to the Elasticsearch document.<br><br> If you set <code>schemaEnable</code> to <code>true</code>, the sink interprets the contents of the message and you can define a <strong>primary key</strong> that in turn used as the special <code>_id</code> field on Elasticsearch.</td></tr>
</tbody>
</table>
<p><br><br> This allows you to perform <code>UPDATE</code>, <code>INSERT</code>, and <code>DELETE</code> operations
to Elasticsearch driven by the logical primary key of the message.<br><br> This
is very useful in a typical Change Data Capture scenario in which you follow the
changes on your database, write them to Pulsar (using the Debezium adapter for
instance), and then you write to Elasticsearch.<br><br> You configure the
mapping of the primary key using the <code>primaryFields</code> configuration
entry.<br><br>The <code>DELETE</code> operation can be performed when the primary key is
not empty and the remaining value is empty. Use the <code>nullValueAction</code> to
configure this behaviour. The default configuration simply ignores such empty
values.</p>
<h3><a class="anchor" aria-hidden="true" id="map-multiple-indexes"></a><a href="#map-multiple-indexes" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Map multiple indexes</h3>
<p>Since Pulsar 2.9.0, the <code>indexName</code> property is no more required. If you omit it, the sink writes to an index name after the Pulsar topic name.</p>
<h3><a class="anchor" aria-hidden="true" id="enable-bulk-writes"></a><a href="#enable-bulk-writes" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Enable bulk writes</h3>
<p>Since Pulsar 2.9.0, you can use bulk writes by setting the <code>bulkEnabled</code> property to <code>true</code>.</p>
<h3><a class="anchor" aria-hidden="true" id="enable-secure-connections-via-tls"></a><a href="#enable-secure-connections-via-tls" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Enable secure connections via TLS</h3>
<p>Since Pulsar 2.9.0, you can enable secure connections with TLS.</p>
<h2><a class="anchor" aria-hidden="true" id="configuration"></a><a href="#configuration" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Configuration</h2>
<p>The configuration of the Elasticsearch sink connector has the following properties.</p>
<h3><a class="anchor" aria-hidden="true" id="property"></a><a href="#property" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Property</h3>
<table>
<thead>
<tr><th>Name</th><th>Type</th><th>Required</th><th>Default</th><th>Description</th></tr>
</thead>
<tbody>
<tr><td><code>elasticSearchUrl</code></td><td>String</td><td>true</td><td>&quot; &quot; (empty string)</td><td>The URL of elastic search cluster to which the connector connects.</td></tr>
<tr><td><code>indexName</code></td><td>String</td><td>true</td><td>&quot; &quot; (empty string)</td><td>The index name to which the connector writes messages.</td></tr>
<tr><td><code>schemaEnable</code></td><td>Boolean</td><td>false</td><td>false</td><td>Turn on the Schema Aware mode.</td></tr>
<tr><td><code>createIndexIfNeeded</code></td><td>Boolean</td><td>false</td><td>false</td><td>Manage index if missing.</td></tr>
<tr><td><code>maxRetries</code></td><td>Integer</td><td>false</td><td>1</td><td>The maximum number of retries for elasticsearch requests. Use -1 to disable it.</td></tr>
<tr><td><code>retryBackoffInMs</code></td><td>Integer</td><td>false</td><td>100</td><td>The base time to wait when retrying an Elasticsearch request (in milliseconds).</td></tr>
<tr><td><code>maxRetryTimeInSec</code></td><td>Integer</td><td>false</td><td>86400</td><td>The maximum retry time interval in seconds for retrying an elasticsearch request.</td></tr>
<tr><td><code>bulkEnabled</code></td><td>Boolean</td><td>false</td><td>false</td><td>Enable the elasticsearch bulk processor to flush write requests based on the number or size of requests, or after a given period.</td></tr>
<tr><td><code>bulkActions</code></td><td>Integer</td><td>false</td><td>1000</td><td>The maximum number of actions per elasticsearch bulk request. Use -1 to disable it.</td></tr>
<tr><td><code>bulkSizeInMb</code></td><td>Integer</td><td>false</td><td>5</td><td>The maximum size in megabytes of elasticsearch bulk requests. Use -1 to disable it.</td></tr>
<tr><td><code>bulkConcurrentRequests</code></td><td>Integer</td><td>false</td><td>0</td><td>The maximum number of in flight elasticsearch bulk requests. The default 0 allows the execution of a single request. A value of 1 means 1 concurrent request is allowed to be executed while accumulating new bulk requests.</td></tr>
<tr><td><code>bulkFlushIntervalInMs</code></td><td>Integer</td><td>false</td><td>-1</td><td>The maximum period of time to wait for flushing pending writes when bulk writes are enabled. Default is -1 meaning not set.</td></tr>
<tr><td><code>compressionEnabled</code></td><td>Boolean</td><td>false</td><td>false</td><td>Enable elasticsearch request compression.</td></tr>
<tr><td><code>connectTimeoutInMs</code></td><td>Integer</td><td>false</td><td>5000</td><td>The elasticsearch client connection timeout in milliseconds.</td></tr>
<tr><td><code>connectionRequestTimeoutInMs</code></td><td>Integer</td><td>false</td><td>1000</td><td>The time in milliseconds for getting a connection from the elasticsearch connection pool.</td></tr>
<tr><td><code>connectionIdleTimeoutInMs</code></td><td>Integer</td><td>false</td><td>5</td><td>Idle connection timeout to prevent a read timeout.</td></tr>
<tr><td><code>keyIgnore</code></td><td>Boolean</td><td>false</td><td>true</td><td>Whether to ignore the record key to build the Elasticsearch document <code>_id</code>. If primaryFields is defined, the connector extract the primary fields from the payload to build the document <code>_id</code> If no primaryFields are provided, elasticsearch auto generates a random document <code>_id</code>.</td></tr>
<tr><td><code>primaryFields</code></td><td>String</td><td>false</td><td>&quot;id&quot;</td><td>The comma separated ordered list of field names used to build the Elasticsearch document <code>_id</code> from the record value. If this list is a singleton, the field is converted as a string. If this list has 2 or more fields, the generated <code>_id</code> is a string representation of a JSON array of the field values.</td></tr>
<tr><td><code>nullValueAction</code></td><td>enum (IGNORE,DELETE,FAIL)</td><td>false</td><td>IGNORE</td><td>How to handle records with null values, possible options are IGNORE, DELETE or FAIL. Default is IGNORE the message.</td></tr>
<tr><td><code>malformedDocAction</code></td><td>enum (IGNORE,WARN,FAIL)</td><td>false</td><td>FAIL</td><td>How to handle elasticsearch rejected documents due to some malformation. Possible options are IGNORE, DELETE or FAIL. Default is FAIL the Elasticsearch document.</td></tr>
<tr><td><code>stripNulls</code></td><td>Boolean</td><td>false</td><td>true</td><td>If stripNulls is false, elasticsearch _source includes 'null' for empty fields (for example {&quot;foo&quot;: null}), otherwise null fields are stripped.</td></tr>
<tr><td><code>socketTimeoutInMs</code></td><td>Integer</td><td>false</td><td>60000</td><td>The socket timeout in milliseconds waiting to read the elasticsearch response.</td></tr>
<tr><td><code>typeName</code></td><td>String</td><td>false</td><td>&quot;_doc&quot;</td><td>The type name to which the connector writes messages to. <br><br> The value should be set explicitly to a valid type name other than &quot;_doc&quot; for Elasticsearch version before 6.2, and left to default otherwise.</td></tr>
<tr><td><code>indexNumberOfShards</code></td><td>int</td><td>false</td><td>1</td><td>The number of shards of the index.</td></tr>
<tr><td><code>indexNumberOfReplicas</code></td><td>int</td><td>false</td><td>1</td><td>The number of replicas of the index.</td></tr>
<tr><td><code>username</code></td><td>String</td><td>false</td><td>&quot; &quot; (empty string)</td><td>The username used by the connector to connect to the elastic search cluster. <br><br>If <code>username</code> is set, then <code>password</code> should also be provided.</td></tr>
<tr><td><code>password</code></td><td>String</td><td>false</td><td>&quot; &quot; (empty string)</td><td>The password used by the connector to connect to the elastic search cluster. <br><br>If <code>username</code> is set, then <code>password</code> should also be provided.</td></tr>
<tr><td><code>ssl</code></td><td>ElasticSearchSslConfig</td><td>false</td><td></td><td>Configuration for TLS encrypted communication</td></tr>
</tbody>
</table>
<h3><a class="anchor" aria-hidden="true" id="definition-of-elasticsearchsslconfig-structure"></a><a href="#definition-of-elasticsearchsslconfig-structure" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Definition of ElasticSearchSslConfig structure:</h3>
<table>
<thead>
<tr><th>Name</th><th>Type</th><th>Required</th><th>Default</th><th>Description</th></tr>
</thead>
<tbody>
<tr><td><code>enabled</code></td><td>Boolean</td><td>false</td><td>false</td><td>Enable SSL/TLS.</td></tr>
<tr><td><code>hostnameVerification</code></td><td>Boolean</td><td>false</td><td>true</td><td>Whether or not to validate node hostnames when using SSL.</td></tr>
<tr><td><code>truststorePath</code></td><td>String</td><td>false</td><td>&quot; &quot; (empty string)</td><td>The path to the truststore file.</td></tr>
<tr><td><code>truststorePassword</code></td><td>String</td><td>false</td><td>&quot; &quot; (empty string)</td><td>Truststore password.</td></tr>
<tr><td><code>keystorePath</code></td><td>String</td><td>false</td><td>&quot; &quot; (empty string)</td><td>The path to the keystore file.</td></tr>
<tr><td><code>keystorePassword</code></td><td>String</td><td>false</td><td>&quot; &quot; (empty string)</td><td>Keystore password.</td></tr>
<tr><td><code>cipherSuites</code></td><td>String</td><td>false</td><td>&quot; &quot; (empty string)</td><td>SSL/TLS cipher suites.</td></tr>
<tr><td><code>protocols</code></td><td>String</td><td>false</td><td>&quot;TLSv1.2&quot;</td><td>Comma separated list of enabled SSL/TLS protocols.</td></tr>
</tbody>
</table>
<h2><a class="anchor" aria-hidden="true" id="example"></a><a href="#example" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Example</h2>
<p>Before using the Elasticsearch sink connector, you need to create a configuration file through one of the following methods.</p>
<h3><a class="anchor" aria-hidden="true" id="configuration-1"></a><a href="#configuration-1" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Configuration</h3>
<h4><a class="anchor" aria-hidden="true" id="for-elasticsearch-after-62"></a><a href="#for-elasticsearch-after-62" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>For Elasticsearch After 6.2</h4>
<ul>
<li><p>JSON</p>
<pre><code class="hljs css language-json">{
<span class="hljs-attr">"elasticSearchUrl"</span>: <span class="hljs-string">"http://localhost:9200"</span>,
<span class="hljs-attr">"indexName"</span>: <span class="hljs-string">"my_index"</span>,
<span class="hljs-attr">"username"</span>: <span class="hljs-string">"scooby"</span>,
<span class="hljs-attr">"password"</span>: <span class="hljs-string">"doobie"</span>
}
</code></pre></li>
<li><p>YAML</p>
<pre><code class="hljs css language-yaml"><span class="hljs-attr">configs:</span>
<span class="hljs-attr">elasticSearchUrl:</span> <span class="hljs-string">"http://localhost:9200"</span>
<span class="hljs-attr">indexName:</span> <span class="hljs-string">"my_index"</span>
<span class="hljs-attr">username:</span> <span class="hljs-string">"scooby"</span>
<span class="hljs-attr">password:</span> <span class="hljs-string">"doobie"</span>
</code></pre></li>
</ul>
<h4><a class="anchor" aria-hidden="true" id="for-elasticsearch-before-62"></a><a href="#for-elasticsearch-before-62" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>For Elasticsearch Before 6.2</h4>
<ul>
<li><p>JSON</p>
<pre><code class="hljs css language-json">{
<span class="hljs-attr">"elasticSearchUrl"</span>: <span class="hljs-string">"http://localhost:9200"</span>,
<span class="hljs-attr">"indexName"</span>: <span class="hljs-string">"my_index"</span>,
<span class="hljs-attr">"typeName"</span>: <span class="hljs-string">"doc"</span>,
<span class="hljs-attr">"username"</span>: <span class="hljs-string">"scooby"</span>,
<span class="hljs-attr">"password"</span>: <span class="hljs-string">"doobie"</span>
}
</code></pre></li>
<li><p>YAML</p>
<pre><code class="hljs css language-yaml"><span class="hljs-attr">configs:</span>
<span class="hljs-attr">elasticSearchUrl:</span> <span class="hljs-string">"http://localhost:9200"</span>
<span class="hljs-attr">indexName:</span> <span class="hljs-string">"my_index"</span>
<span class="hljs-attr">typeName:</span> <span class="hljs-string">"doc"</span>
<span class="hljs-attr">username:</span> <span class="hljs-string">"scooby"</span>
<span class="hljs-attr">password:</span> <span class="hljs-string">"doobie"</span>
</code></pre></li>
</ul>
<h3><a class="anchor" aria-hidden="true" id="usage"></a><a href="#usage" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Usage</h3>
<ol>
<li><p>Start a single node Elasticsearch cluster.</p>
<pre><code class="hljs css language-bash">$ docker run -p 9200:9200 -p 9300:9300 \
-e <span class="hljs-string">"discovery.type=single-node"</span> \
docker.elastic.co/elasticsearch/elasticsearch:7.13.3
</code></pre></li>
<li><p>Start a Pulsar service locally in standalone mode.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar standalone
</code></pre>
<p>Make sure the NAR file is available at <code>connectors/pulsar-io-elastic-search-2.9.0.nar</code>.</p></li>
<li><p>Start the Pulsar Elasticsearch connector in local run mode using one of the following methods.</p>
<ul>
<li><p>Use the <strong>JSON</strong> configuration as shown previously.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin sinks localrun \
--archive connectors/pulsar-io-elastic-search-2.9.0.nar \
--tenant public \
--namespace default \
--name elasticsearch-test-sink \
--sink-config <span class="hljs-string">'{"elasticSearchUrl":"http://localhost:9200","indexName": "my_index","username": "scooby","password": "doobie"}'</span> \
--inputs elasticsearch_test
</code></pre></li>
<li><p>Use the <strong>YAML</strong> configuration file as shown previously.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin sinks localrun \
--archive connectors/pulsar-io-elastic-search-2.9.0.nar \
--tenant public \
--namespace default \
--name elasticsearch-test-sink \
--sink-config-file elasticsearch-sink.yml \
--inputs elasticsearch_test
</code></pre></li>
</ul></li>
<li><p>Publish records to the topic.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-client produce elasticsearch_test --messages <span class="hljs-string">"{\"a\":1}"</span>
</code></pre></li>
<li><p>Check documents in Elasticsearch.</p>
<ul>
<li>refresh the index
<pre><code class="hljs css language-bash"> $ curl -s http://localhost:9200/my_index/_refresh
</code></pre></li>
<li>search documents
<pre><code class="hljs css language-bash"> $ curl -s http://localhost:9200/my_index/_search
</code></pre>
You can see the record that published earlier has been successfully written into Elasticsearch.
<pre><code class="hljs css language-json">{<span class="hljs-attr">"took"</span>:<span class="hljs-number">2</span>,<span class="hljs-attr">"timed_out"</span>:<span class="hljs-literal">false</span>,<span class="hljs-attr">"_shards"</span>:{<span class="hljs-attr">"total"</span>:<span class="hljs-number">1</span>,<span class="hljs-attr">"successful"</span>:<span class="hljs-number">1</span>,<span class="hljs-attr">"skipped"</span>:<span class="hljs-number">0</span>,<span class="hljs-attr">"failed"</span>:<span class="hljs-number">0</span>},<span class="hljs-attr">"hits"</span>:{<span class="hljs-attr">"total"</span>:{<span class="hljs-attr">"value"</span>:<span class="hljs-number">1</span>,<span class="hljs-attr">"relation"</span>:<span class="hljs-string">"eq"</span>},<span class="hljs-attr">"max_score"</span>:<span class="hljs-number">1.0</span>,<span class="hljs-attr">"hits"</span>:[{<span class="hljs-attr">"_index"</span>:<span class="hljs-string">"my_index"</span>,<span class="hljs-attr">"_type"</span>:<span class="hljs-string">"_doc"</span>,<span class="hljs-attr">"_id"</span>:<span class="hljs-string">"FSxemm8BLjG_iC0EeTYJ"</span>,<span class="hljs-attr">"_score"</span>:<span class="hljs-number">1.0</span>,<span class="hljs-attr">"_source"</span>:{<span class="hljs-attr">"a"</span>:<span class="hljs-number">1</span>}}]}}
</code></pre></li>
</ul></li>
</ol>
</span></div></article></div><div class="docs-prevnext"></div></div></div><nav class="onPageNav"><ul class="toc-headings"><li><a href="#feature">Feature</a><ul class="toc-headings"><li><a href="#handle-data">Handle data</a></li><li><a href="#map-multiple-indexes">Map multiple indexes</a></li><li><a href="#enable-bulk-writes">Enable bulk writes</a></li><li><a href="#enable-secure-connections-via-tls">Enable secure connections via TLS</a></li></ul></li><li><a href="#configuration">Configuration</a><ul class="toc-headings"><li><a href="#property">Property</a></li><li><a href="#definition-of-elasticsearchsslconfig-structure">Definition of ElasticSearchSslConfig structure:</a></li></ul></li><li><a href="#example">Example</a><ul class="toc-headings"><li><a href="#configuration-1">Configuration</a></li><li><a href="#usage">Usage</a></li></ul></li></ul></nav></div><footer class="nav-footer" id="footer"><section class="copyright">Copyright © 2022 The Apache Software Foundation. All Rights Reserved. Apache, Apache Pulsar and the Apache feather logo are trademarks of The Apache Software Foundation.</section><span><script>
const community = document.querySelector("a[href='#community']").parentNode;
const communityMenu =
'<li>' +
'<a id="community-menu" href="#">Community <span style="font-size: 0.75em">&nbsp;▼</span></a>' +
'<div id="community-dropdown" class="hide">' +
'<ul id="community-dropdown-items">' +
'<li><a href="/en/contact">Contact</a></li>' +
'<li><a href="/en/contributing">Contributing</a></li>' +
'<li><a href="/en/coding-guide">Coding guide</a></li>' +
'<li><a href="/en/events">Events</a></li>' +
'<li><a href="https://twitter.com/Apache_Pulsar" target="_blank">Twitter &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/wiki" target="_blank">Wiki &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/issues" target="_blank">Issue tracking &#x2750</a></li>' +
'<li><a href="https://pulsar-summit.org/" target="_blank">Pulsar Summit &#x2750</a></li>' +
'<li>&nbsp;</li>' +
'<li><a href="/en/resources">Resources</a></li>' +
'<li><a href="/en/team">Team</a></li>' +
'<li><a href="/en/powered-by">Powered By</a></li>' +
'</ul>' +
'</div>' +
'</li>';
community.innerHTML = communityMenu;
const communityMenuItem = document.getElementById("community-menu");
const communityDropDown = document.getElementById("community-dropdown");
communityMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (communityDropDown.className == 'hide') {
communityDropDown.className = 'visible';
} else {
communityDropDown.className = 'hide';
}
});
</script></span></footer></div><script>window.twttr=(function(d,s, id){var js,fjs=d.getElementsByTagName(s)[0],t=window.twttr||{};if(d.getElementById(id))return t;js=d.createElement(s);js.id=id;js.src='https://platform.twitter.com/widgets.js';fjs.parentNode.insertBefore(js, fjs);t._e = [];t.ready = function(f) {t._e.push(f);};return t;}(document, 'script', 'twitter-wjs'));</script></body></html>