blob: 7d8825d75576f8dd8a08fd51f3eed4b86f3aae47 [file] [log] [blame]
<!DOCTYPE html><html lang="en"><head><meta charSet="utf-8"/><meta http-equiv="X-UA-Compatible" content="IE=edge"/><title>Elasticsearch sink connector · Apache Pulsar</title><meta name="viewport" content="width=device-width, initial-scale=1.0"/><meta name="generator" content="Docusaurus"/><meta name="description" content="The Elasticsearch sink connector pulls messages from Pulsar topics and persists the messages to indexes."/><meta name="docsearch:version" content="2.10.0"/><meta name="docsearch:language" content="en"/><meta property="og:title" content="Elasticsearch sink connector · Apache Pulsar"/><meta property="og:type" content="website"/><meta property="og:url" content="https://pulsar.apache.org/"/><meta property="og:description" content="The Elasticsearch sink connector pulls messages from Pulsar topics and persists the messages to indexes."/><meta name="twitter:card" content="summary"/><meta name="twitter:image" content="https://pulsar.apache.org/img/pulsar.svg"/><link rel="shortcut icon" href="/img/pulsar.ico"/><link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/atom-one-dark.min.css"/><link rel="alternate" type="application/atom+xml" href="https://pulsar.apache.org/blog/atom.xml" title="Apache Pulsar Blog ATOM Feed"/><link rel="alternate" type="application/rss+xml" href="https://pulsar.apache.org/blog/feed.xml" title="Apache Pulsar Blog RSS Feed"/><link rel="stylesheet" href="/css/code-blocks-buttons.css"/><script type="text/javascript" src="https://buttons.github.io/buttons.js"></script><script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/clipboard.js/2.0.0/clipboard.min.js"></script><script type="text/javascript" src="/js/custom.js"></script><script src="/js/scrollSpy.js"></script><link rel="stylesheet" href="/css/main.css"/><script src="/js/codetabs.js"></script></head><body class="sideNavVisible separateOnPageNav"><div class="fixedHeaderContainer"><div class="headerWrapper wrapper"><header><a href="/en"><img class="logo" src="/img/pulsar.svg" alt="Apache Pulsar"/></a><a href="/en/versions"><h3>2.10.0</h3></a><div class="navigationWrapper navigationSlider"><nav class="slidingNav"><ul class="nav-site nav-site-internal"><li class=""><a href="/docs/en/getting-started-standalone" target="_self">Docs</a></li><li class=""><a href="/en/download" target="_self">Download</a></li><li class=""><a href="/docs/en/client-libraries" target="_self">Clients</a></li><li class=""><a href="#restapis" target="_self">REST APIs</a></li><li class=""><a href="#cli" target="_self">Cli</a></li><li class=""><a href="/blog/" target="_self">Blog</a></li><li class=""><a href="#community" target="_self">Community</a></li><li class=""><a href="#apache" target="_self">Apache</a></li><li class=""><a href="https://pulsar-next.staged.apache.org/" target="_self">New Website (Beta)</a></li><span><li><a id="languages-menu" href="#"><img class="languages-icon" src="/img/language.svg" alt="Languages icon"/>English</a><div id="languages-dropdown" class="hide"><ul id="languages-dropdown-items"><li><a href="/docs/ja/io-elasticsearch-sink">日本語</a></li><li><a href="/docs/fr/io-elasticsearch-sink">Français</a></li><li><a href="/docs/ko/io-elasticsearch-sink">한국어</a></li><li><a href="/docs/zh-CN/io-elasticsearch-sink">中文</a></li><li><a href="/docs/zh-TW/io-elasticsearch-sink">繁體中文</a></li><li><a href="https://crowdin.com/project/apache-pulsar" target="_blank" rel="noreferrer noopener">Help Translate</a></li></ul></div></li><script>
const languagesMenuItem = document.getElementById("languages-menu");
const languagesDropDown = document.getElementById("languages-dropdown");
languagesMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (languagesDropDown.className == "hide") {
languagesDropDown.className = "visible";
} else {
languagesDropDown.className = "hide";
}
});
</script></span></ul></nav></div></header></div></div><div class="navPusher"><div class="docMainWrapper wrapper"><div class="container mainContainer docsContainer"><div class="wrapper"><div class="post"><header class="postHeader"><a class="edit-page-link button" href="https://github.com/apache/pulsar/edit/master/site2/docs/io-elasticsearch-sink.md" target="_blank" rel="noreferrer noopener">Edit</a><h1 id="__docusaurus" class="postHeaderTitle">Elasticsearch sink connector</h1></header><article><div><span><p>The Elasticsearch sink connector pulls messages from Pulsar topics and persists the messages to indexes.</p>
<h2><a class="anchor" aria-hidden="true" id="feature"></a><a href="#feature" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Feature</h2>
<h3><a class="anchor" aria-hidden="true" id="handle-data"></a><a href="#handle-data" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Handle data</h3>
<p>Since Pulsar 2.9.0, the Elasticsearch sink connector has the following ways of
working. You can choose one of them.</p>
<table>
<thead>
<tr><th>Name</th><th>Description</th></tr>
</thead>
<tbody>
<tr><td>Raw processing</td><td>The sink reads from topics and passes the raw content to Elasticsearch. <br><br> This is the <strong>default</strong> behavior. <br><br> Raw processing was already available <strong>in Pulsar 2.8.x</strong>.</td></tr>
<tr><td>Schema aware</td><td>The sink uses the schema and handles AVRO, JSON, and KeyValue schema types while mapping the content to the Elasticsearch document.<br><br> If you set <code>schemaEnable</code> to <code>true</code>, the sink interprets the contents of the message and you can define a <strong>primary key</strong> that in turn used as the special <code>_id</code> field on Elasticsearch.</td></tr>
</tbody>
</table>
<p><br><br> This allows you to perform <code>UPDATE</code>, <code>INSERT</code>, and <code>DELETE</code> operations
to Elasticsearch driven by the logical primary key of the message.<br><br> This
is very useful in a typical Change Data Capture scenario in which you follow the
changes on your database, write them to Pulsar (using the Debezium adapter for
instance), and then you write to Elasticsearch.<br><br> You configure the
mapping of the primary key using the <code>primaryFields</code> configuration
entry.<br><br>The <code>DELETE</code> operation can be performed when the primary key is
not empty and the remaining value is empty. Use the <code>nullValueAction</code> to
configure this behaviour. The default configuration simply ignores such empty
values.</p>
<h3><a class="anchor" aria-hidden="true" id="map-multiple-indexes"></a><a href="#map-multiple-indexes" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Map multiple indexes</h3>
<p>Since Pulsar 2.9.0, the <code>indexName</code> property is no more required. If you omit it, the sink writes to an index name after the Pulsar topic name.</p>
<h3><a class="anchor" aria-hidden="true" id="enable-bulk-writes"></a><a href="#enable-bulk-writes" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Enable bulk writes</h3>
<p>Since Pulsar 2.9.0, you can use bulk writes by setting the <code>bulkEnabled</code> property to <code>true</code>.</p>
<h3><a class="anchor" aria-hidden="true" id="enable-secure-connections-via-tls"></a><a href="#enable-secure-connections-via-tls" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Enable secure connections via TLS</h3>
<p>Since Pulsar 2.9.0, you can enable secure connections with TLS.</p>
<h2><a class="anchor" aria-hidden="true" id="configuration"></a><a href="#configuration" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Configuration</h2>
<p>The configuration of the Elasticsearch sink connector has the following properties.</p>
<h3><a class="anchor" aria-hidden="true" id="property"></a><a href="#property" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Property</h3>
<table>
<thead>
<tr><th>Name</th><th>Type</th><th>Required</th><th>Default</th><th>Description</th></tr>
</thead>
<tbody>
<tr><td><code>elasticSearchUrl</code></td><td>String</td><td>true</td><td>&quot; &quot; (empty string)</td><td>The URL of elastic search cluster to which the connector connects.</td></tr>
<tr><td><code>indexName</code></td><td>String</td><td>false</td><td>&quot; &quot; (empty string)</td><td>The index name to which the connector writes messages. The default value is the topic name. It accepts date formats in the name to support event time based index with the pattern <code>%{+&lt;date-format&gt;}</code>. For example, suppose the event time of the record is 1645182000000L, the indexName is <code>logs-%{+yyyy-MM-dd}</code>, then the formatted index name would be <code>logs-2022-02-18</code>.</td></tr>
<tr><td><code>schemaEnable</code></td><td>Boolean</td><td>false</td><td>false</td><td>Turn on the Schema Aware mode.</td></tr>
<tr><td><code>createIndexIfNeeded</code></td><td>Boolean</td><td>false</td><td>false</td><td>Manage index if missing.</td></tr>
<tr><td><code>maxRetries</code></td><td>Integer</td><td>false</td><td>1</td><td>The maximum number of retries for elasticsearch requests. Use -1 to disable it.</td></tr>
<tr><td><code>retryBackoffInMs</code></td><td>Integer</td><td>false</td><td>100</td><td>The base time to wait when retrying an Elasticsearch request (in milliseconds).</td></tr>
<tr><td><code>maxRetryTimeInSec</code></td><td>Integer</td><td>false</td><td>86400</td><td>The maximum retry time interval in seconds for retrying an elasticsearch request.</td></tr>
<tr><td><code>bulkEnabled</code></td><td>Boolean</td><td>false</td><td>false</td><td>Enable the elasticsearch bulk processor to flush write requests based on the number or size of requests, or after a given period.</td></tr>
<tr><td><code>bulkActions</code></td><td>Integer</td><td>false</td><td>1000</td><td>The maximum number of actions per elasticsearch bulk request. Use -1 to disable it.</td></tr>
<tr><td><code>bulkSizeInMb</code></td><td>Integer</td><td>false</td><td>5</td><td>The maximum size in megabytes of elasticsearch bulk requests. Use -1 to disable it.</td></tr>
<tr><td><code>bulkConcurrentRequests</code></td><td>Integer</td><td>false</td><td>0</td><td>The maximum number of in flight elasticsearch bulk requests. The default 0 allows the execution of a single request. A value of 1 means 1 concurrent request is allowed to be executed while accumulating new bulk requests.</td></tr>
<tr><td><code>bulkFlushIntervalInMs</code></td><td>Integer</td><td>false</td><td>-1</td><td>The maximum period of time to wait for flushing pending writes when bulk writes are enabled. Default is -1 meaning not set.</td></tr>
<tr><td><code>compressionEnabled</code></td><td>Boolean</td><td>false</td><td>false</td><td>Enable elasticsearch request compression.</td></tr>
<tr><td><code>connectTimeoutInMs</code></td><td>Integer</td><td>false</td><td>5000</td><td>The elasticsearch client connection timeout in milliseconds.</td></tr>
<tr><td><code>connectionRequestTimeoutInMs</code></td><td>Integer</td><td>false</td><td>1000</td><td>The time in milliseconds for getting a connection from the elasticsearch connection pool.</td></tr>
<tr><td><code>connectionIdleTimeoutInMs</code></td><td>Integer</td><td>false</td><td>5</td><td>Idle connection timeout to prevent a read timeout.</td></tr>
<tr><td><code>keyIgnore</code></td><td>Boolean</td><td>false</td><td>true</td><td>Whether to ignore the record key to build the Elasticsearch document <code>_id</code>. If primaryFields is defined, the connector extract the primary fields from the payload to build the document <code>_id</code> If no primaryFields are provided, elasticsearch auto generates a random document <code>_id</code>.</td></tr>
<tr><td><code>primaryFields</code></td><td>String</td><td>false</td><td>&quot;id&quot;</td><td>The comma separated ordered list of field names used to build the Elasticsearch document <code>_id</code> from the record value. If this list is a singleton, the field is converted as a string. If this list has 2 or more fields, the generated <code>_id</code> is a string representation of a JSON array of the field values.</td></tr>
<tr><td><code>nullValueAction</code></td><td>enum (IGNORE,DELETE,FAIL)</td><td>false</td><td>IGNORE</td><td>How to handle records with null values, possible options are IGNORE, DELETE or FAIL. Default is IGNORE the message.</td></tr>
<tr><td><code>malformedDocAction</code></td><td>enum (IGNORE,WARN,FAIL)</td><td>false</td><td>FAIL</td><td>How to handle elasticsearch rejected documents due to some malformation. Possible options are IGNORE, DELETE or FAIL. Default is FAIL the Elasticsearch document.</td></tr>
<tr><td><code>stripNulls</code></td><td>Boolean</td><td>false</td><td>true</td><td>If stripNulls is false, elasticsearch _source includes 'null' for empty fields (for example {&quot;foo&quot;: null}), otherwise null fields are stripped.</td></tr>
<tr><td><code>socketTimeoutInMs</code></td><td>Integer</td><td>false</td><td>60000</td><td>The socket timeout in milliseconds waiting to read the elasticsearch response.</td></tr>
<tr><td><code>typeName</code></td><td>String</td><td>false</td><td>&quot;_doc&quot;</td><td>The type name to which the connector writes messages to. <br><br> The value should be set explicitly to a valid type name other than &quot;_doc&quot; for Elasticsearch version before 6.2, and left to default otherwise.</td></tr>
<tr><td><code>indexNumberOfShards</code></td><td>int</td><td>false</td><td>1</td><td>The number of shards of the index.</td></tr>
<tr><td><code>indexNumberOfReplicas</code></td><td>int</td><td>false</td><td>1</td><td>The number of replicas of the index.</td></tr>
<tr><td><code>username</code></td><td>String</td><td>false</td><td>&quot; &quot; (empty string)</td><td>The username used by the connector to connect to the elastic search cluster. <br><br>If <code>username</code> is set, then <code>password</code> should also be provided.</td></tr>
<tr><td><code>password</code></td><td>String</td><td>false</td><td>&quot; &quot; (empty string)</td><td>The password used by the connector to connect to the elastic search cluster. <br><br>If <code>username</code> is set, then <code>password</code> should also be provided.</td></tr>
<tr><td><code>ssl</code></td><td>ElasticSearchSslConfig</td><td>false</td><td></td><td>Configuration for TLS encrypted communication</td></tr>
</tbody>
</table>
<h3><a class="anchor" aria-hidden="true" id="definition-of-elasticsearchsslconfig-structure"></a><a href="#definition-of-elasticsearchsslconfig-structure" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Definition of ElasticSearchSslConfig structure:</h3>
<table>
<thead>
<tr><th>Name</th><th>Type</th><th>Required</th><th>Default</th><th>Description</th></tr>
</thead>
<tbody>
<tr><td><code>enabled</code></td><td>Boolean</td><td>false</td><td>false</td><td>Enable SSL/TLS.</td></tr>
<tr><td><code>hostnameVerification</code></td><td>Boolean</td><td>false</td><td>true</td><td>Whether or not to validate node hostnames when using SSL.</td></tr>
<tr><td><code>truststorePath</code></td><td>String</td><td>false</td><td>&quot; &quot; (empty string)</td><td>The path to the truststore file.</td></tr>
<tr><td><code>truststorePassword</code></td><td>String</td><td>false</td><td>&quot; &quot; (empty string)</td><td>Truststore password.</td></tr>
<tr><td><code>keystorePath</code></td><td>String</td><td>false</td><td>&quot; &quot; (empty string)</td><td>The path to the keystore file.</td></tr>
<tr><td><code>keystorePassword</code></td><td>String</td><td>false</td><td>&quot; &quot; (empty string)</td><td>Keystore password.</td></tr>
<tr><td><code>cipherSuites</code></td><td>String</td><td>false</td><td>&quot; &quot; (empty string)</td><td>SSL/TLS cipher suites.</td></tr>
<tr><td><code>protocols</code></td><td>String</td><td>false</td><td>&quot;TLSv1.2&quot;</td><td>Comma separated list of enabled SSL/TLS protocols.</td></tr>
</tbody>
</table>
<h2><a class="anchor" aria-hidden="true" id="example"></a><a href="#example" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Example</h2>
<p>Before using the Elasticsearch sink connector, you need to create a configuration file through one of the following methods.</p>
<h3><a class="anchor" aria-hidden="true" id="configuration-1"></a><a href="#configuration-1" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Configuration</h3>
<h4><a class="anchor" aria-hidden="true" id="for-elasticsearch-after-62"></a><a href="#for-elasticsearch-after-62" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>For Elasticsearch After 6.2</h4>
<ul>
<li><p>JSON</p>
<pre><code class="hljs css language-json">{
<span class="hljs-attr">"configs"</span>: {
<span class="hljs-attr">"elasticSearchUrl"</span>: <span class="hljs-string">"http://localhost:9200"</span>,
<span class="hljs-attr">"indexName"</span>: <span class="hljs-string">"my_index"</span>,
<span class="hljs-attr">"username"</span>: <span class="hljs-string">"scooby"</span>,
<span class="hljs-attr">"password"</span>: <span class="hljs-string">"doobie"</span>
}
}
</code></pre></li>
<li><p>YAML</p>
<pre><code class="hljs css language-yaml"><span class="hljs-attr">configs:</span>
<span class="hljs-attr">elasticSearchUrl:</span> <span class="hljs-string">"http://localhost:9200"</span>
<span class="hljs-attr">indexName:</span> <span class="hljs-string">"my_index"</span>
<span class="hljs-attr">username:</span> <span class="hljs-string">"scooby"</span>
<span class="hljs-attr">password:</span> <span class="hljs-string">"doobie"</span>
</code></pre></li>
</ul>
<h4><a class="anchor" aria-hidden="true" id="for-elasticsearch-before-62"></a><a href="#for-elasticsearch-before-62" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>For Elasticsearch Before 6.2</h4>
<ul>
<li><p>JSON</p>
<pre><code class="hljs css language-json">{
<span class="hljs-attr">"elasticSearchUrl"</span>: <span class="hljs-string">"http://localhost:9200"</span>,
<span class="hljs-attr">"indexName"</span>: <span class="hljs-string">"my_index"</span>,
<span class="hljs-attr">"typeName"</span>: <span class="hljs-string">"doc"</span>,
<span class="hljs-attr">"username"</span>: <span class="hljs-string">"scooby"</span>,
<span class="hljs-attr">"password"</span>: <span class="hljs-string">"doobie"</span>
}
</code></pre></li>
<li><p>YAML</p>
<pre><code class="hljs css language-yaml"><span class="hljs-attr">configs:</span>
<span class="hljs-attr">elasticSearchUrl:</span> <span class="hljs-string">"http://localhost:9200"</span>
<span class="hljs-attr">indexName:</span> <span class="hljs-string">"my_index"</span>
<span class="hljs-attr">typeName:</span> <span class="hljs-string">"doc"</span>
<span class="hljs-attr">username:</span> <span class="hljs-string">"scooby"</span>
<span class="hljs-attr">password:</span> <span class="hljs-string">"doobie"</span>
</code></pre></li>
</ul>
<h3><a class="anchor" aria-hidden="true" id="usage"></a><a href="#usage" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Usage</h3>
<ol>
<li><p>Start a single node Elasticsearch cluster.</p>
<pre><code class="hljs css language-bash">$ docker run -p 9200:9200 -p 9300:9300 \
-e <span class="hljs-string">"discovery.type=single-node"</span> \
docker.elastic.co/elasticsearch/elasticsearch:7.13.3
</code></pre></li>
<li><p>Start a Pulsar service locally in standalone mode.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar standalone
</code></pre>
<p>Make sure the NAR file is available at <code>connectors/pulsar-io-elastic-search-2.10.0.nar</code>.</p></li>
<li><p>Start the Pulsar Elasticsearch connector in local run mode using one of the following methods.</p>
<ul>
<li><p>Use the <strong>JSON</strong> configuration as shown previously.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin sinks localrun \
--archive connectors/pulsar-io-elastic-search-2.10.0.nar \
--tenant public \
--namespace default \
--name elasticsearch-test-sink \
--sink-config <span class="hljs-string">'{"elasticSearchUrl":"http://localhost:9200","indexName": "my_index","username": "scooby","password": "doobie"}'</span> \
--inputs elasticsearch_test
</code></pre></li>
<li><p>Use the <strong>YAML</strong> configuration file as shown previously.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin sinks localrun \
--archive connectors/pulsar-io-elastic-search-2.10.0.nar \
--tenant public \
--namespace default \
--name elasticsearch-test-sink \
--sink-config-file elasticsearch-sink.yml \
--inputs elasticsearch_test
</code></pre></li>
</ul></li>
<li><p>Publish records to the topic.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-client produce elasticsearch_test --messages <span class="hljs-string">"{\"a\":1}"</span>
</code></pre></li>
<li><p>Check documents in Elasticsearch.</p>
<ul>
<li>refresh the index
<pre><code class="hljs css language-bash"> $ curl -s http://localhost:9200/my_index/_refresh
</code></pre></li>
<li>search documents
<pre><code class="hljs css language-bash"> $ curl -s http://localhost:9200/my_index/_search
</code></pre>
You can see the record that published earlier has been successfully written into Elasticsearch.
<pre><code class="hljs css language-json">{<span class="hljs-attr">"took"</span>:<span class="hljs-number">2</span>,<span class="hljs-attr">"timed_out"</span>:<span class="hljs-literal">false</span>,<span class="hljs-attr">"_shards"</span>:{<span class="hljs-attr">"total"</span>:<span class="hljs-number">1</span>,<span class="hljs-attr">"successful"</span>:<span class="hljs-number">1</span>,<span class="hljs-attr">"skipped"</span>:<span class="hljs-number">0</span>,<span class="hljs-attr">"failed"</span>:<span class="hljs-number">0</span>},<span class="hljs-attr">"hits"</span>:{<span class="hljs-attr">"total"</span>:{<span class="hljs-attr">"value"</span>:<span class="hljs-number">1</span>,<span class="hljs-attr">"relation"</span>:<span class="hljs-string">"eq"</span>},<span class="hljs-attr">"max_score"</span>:<span class="hljs-number">1.0</span>,<span class="hljs-attr">"hits"</span>:[{<span class="hljs-attr">"_index"</span>:<span class="hljs-string">"my_index"</span>,<span class="hljs-attr">"_type"</span>:<span class="hljs-string">"_doc"</span>,<span class="hljs-attr">"_id"</span>:<span class="hljs-string">"FSxemm8BLjG_iC0EeTYJ"</span>,<span class="hljs-attr">"_score"</span>:<span class="hljs-number">1.0</span>,<span class="hljs-attr">"_source"</span>:{<span class="hljs-attr">"a"</span>:<span class="hljs-number">1</span>}}]}}
</code></pre></li>
</ul></li>
</ol>
</span></div></article></div><div class="docs-prevnext"></div></div></div><nav class="onPageNav"><ul class="toc-headings"><li><a href="#feature">Feature</a><ul class="toc-headings"><li><a href="#handle-data">Handle data</a></li><li><a href="#map-multiple-indexes">Map multiple indexes</a></li><li><a href="#enable-bulk-writes">Enable bulk writes</a></li><li><a href="#enable-secure-connections-via-tls">Enable secure connections via TLS</a></li></ul></li><li><a href="#configuration">Configuration</a><ul class="toc-headings"><li><a href="#property">Property</a></li><li><a href="#definition-of-elasticsearchsslconfig-structure">Definition of ElasticSearchSslConfig structure:</a></li></ul></li><li><a href="#example">Example</a><ul class="toc-headings"><li><a href="#configuration-1">Configuration</a></li><li><a href="#usage">Usage</a></li></ul></li></ul></nav></div><footer class="nav-footer" id="footer"><section class="copyright">Copyright © 2022 The Apache Software Foundation. All Rights Reserved. Apache, Apache Pulsar and the Apache feather logo are trademarks of The Apache Software Foundation.</section><span><script>
const community = document.querySelector("a[href='#community']").parentNode;
const communityMenu =
'<li>' +
'<a id="community-menu" href="#">Community <span style="font-size: 0.75em">&nbsp;▼</span></a>' +
'<div id="community-dropdown" class="hide">' +
'<ul id="community-dropdown-items">' +
'<li><a href="/en/contact">Contact</a></li>' +
'<li><a href="/en/contributing">Contributing</a></li>' +
'<li><a href="/en/coding-guide">Coding guide</a></li>' +
'<li><a href="/en/events">Events</a></li>' +
'<li><a href="https://twitter.com/Apache_Pulsar" target="_blank">Twitter &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/wiki" target="_blank">Wiki &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/issues" target="_blank">Issue tracking &#x2750</a></li>' +
'<li><a href="https://pulsar-summit.org/" target="_blank">Pulsar Summit &#x2750</a></li>' +
'<li>&nbsp;</li>' +
'<li><a href="/en/resources">Resources</a></li>' +
'<li><a href="/en/team">Team</a></li>' +
'<li><a href="/en/powered-by">Powered By</a></li>' +
'</ul>' +
'</div>' +
'</li>';
community.innerHTML = communityMenu;
const communityMenuItem = document.getElementById("community-menu");
const communityDropDown = document.getElementById("community-dropdown");
communityMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (communityDropDown.className == 'hide') {
communityDropDown.className = 'visible';
} else {
communityDropDown.className = 'hide';
}
});
</script></span></footer></div><script>window.twttr=(function(d,s, id){var js,fjs=d.getElementsByTagName(s)[0],t=window.twttr||{};if(d.getElementById(id))return t;js=d.createElement(s);js.id=id;js.src='https://platform.twitter.com/widgets.js';fjs.parentNode.insertBefore(js, fjs);t._e = [];t.ready = function(f) {t._e.push(f);};return t;}(document, 'script', 'twitter-wjs'));</script></body></html>