id: io-elasticsearch-sink title: ElasticSearch sink connector sidebar_label: ElasticSearch sink connector

The ElasticSearch sink connector pulls messages from Pulsar topics and persists the messages to indexes.

Configuration

The configuration of the ElasticSearch sink connector has the following properties.

Property

NameTypeRequiredDefaultDescription
elasticSearchUrlStringtrue" " (empty string)The URL of elastic search cluster to which the connector connects.
indexNameStringtrue" " (empty string)The index name to which the connector writes messages.
typeNameStringfalse“_doc”The type name to which the connector writes messages to.

The value should be set explicitly to a valid type name other than “_doc” for Elasticsearch version before 6.2, and left to default otherwise.
indexNumberOfShardsintfalse1The number of shards of the index.
indexNumberOfReplicasintfalse1The number of replicas of the index.
usernameStringfalse" " (empty string)The username used by the connector to connect to the elastic search cluster.

If username is set, then password should also be provided.
passwordStringfalse" " (empty string)The password used by the connector to connect to the elastic search cluster.

If username is set, then password should also be provided.

Example

Before using the ElasticSearch sink connector, you need to create a configuration file through one of the following methods.

Configuration

For Elasticsearch After 6.2

  • JSON

    {
        "elasticSearchUrl": "http://localhost:9200",
        "indexName": "my_index",
        "username": "scooby",
        "password": "doobie"
    }
    
  • YAML

    configs:
        elasticSearchUrl: "http://localhost:9200"
        indexName: "my_index"
        username: "scooby"
        password: "doobie"
    

For Elasticsearch Before 6.2

  • JSON

    {
        "elasticSearchUrl": "http://localhost:9200",
        "indexName": "my_index",
        "typeName": "doc",
        "username": "scooby",
        "password": "doobie"
    }
    
  • YAML

    configs:
        elasticSearchUrl: "http://localhost:9200"
        indexName: "my_index"
        typeName: "doc"
        username: "scooby"
        password: "doobie"
    

Usage

  1. Start a single node Elasticsearch cluster.

    $ docker run -p 9200:9200 -p 9300:9300 \
        -e "discovery.type=single-node" \
        docker.elastic.co/elasticsearch/elasticsearch:7.5.1
    
  2. Start a Pulsar service locally in standalone mode.

    $ bin/pulsar standalone
    

    Make sure the nar file is available at connectors/pulsar-io-elastic-search-{{pulsar:version}}.nar.

  3. Start the Pulsar Elasticsearch connector in local run mode using one of the following methods.

    • Use the JSON configuration as shown previously.

      $ bin/pulsar-admin sinks localrun \
          --archive connectors/pulsar-io-elastic-search-{{pulsar:version}}.nar \
          --tenant public \
          --namespace default \
          --name elasticsearch-test-sink \
          --sink-type elastic_search \
          --sink-config '{"elasticSearchUrl":"http://localhost:9200","indexName": "my_index","username": "scooby","password": "doobie"}' \
          --inputs elasticsearch_test
      
    • Use the YAML configuration file as shown previously.

      $ bin/pulsar-admin sinks localrun \
          --archive connectors/pulsar-io-elastic-search-{{pulsar:version}}.nar \
          --tenant public \
          --namespace default \
          --name elasticsearch-test-sink \
          --sink-type elastic_search \
          --sink-config-file elasticsearch-sink.yml \
          --inputs elasticsearch_test
      
  4. Publish records to the topic.

    $ bin/pulsar-client produce elasticsearch_test --messages "{\"a\":1}"
    
  5. Check documents in Elasticsearch.

    • refresh the index
          $ curl -s http://localhost:9200/my_index/_refresh
      
    • search documents
          $ curl -s http://localhost:9200/my_index/_search
      
      You can see the record that published earlier has been successfully written into Elasticsearch.
      {"took":2,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":1,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"my_index","_type":"_doc","_id":"FSxemm8BLjG_iC0EeTYJ","_score":1.0,"_source":{"a":1}}]}}