This connector provides sinks that can request document actions to an Opensearch Index. To use this connector, add the following dependency to your project:
Note that the streaming connectors are currently not part of the binary distribution. See [here]({{< ref “docs/dev/configuration/overview” >}}) for information about how to package the program with the libraries for cluster execution.
Instructions for setting up an Opensearch cluster can be found here.
The example below shows how to configure and create a sink:
{{< tabs “a1732edd-4218-470e-adad-b1ebb4021a12” >}} {{< tab “Java” >}}
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.connector.opensearch.sink.OpensearchSinkBuilder; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.http.HttpHost; import org.opensearch.action.index.IndexRequest; import org.opensearch.client.Requests; import java.util.HashMap; import java.util.Map; DataStream<String> input = ...; input.sinkTo( new OpensearchSinkBuilder<String>() .setBulkFlushMaxActions(1) // Instructs the sink to emit after every element, otherwise they would be buffered .setHosts(new HttpHost("127.0.0.1", 9200, "http")) .setEmitter( (element, context, indexer) -> indexer.add(createIndexRequest(element))) .build()); private static IndexRequest createIndexRequest(String element) { Map<String, Object> json = new HashMap<>(); json.put("data", element); return Requests.indexRequest() .index("my-index") .id(element) .source(json); }
{{< /tab >}} {{< tab “Scala” >}}
import org.apache.flink.api.connector.sink.SinkWriter import org.apache.flink.connector.opensearch.sink.{OpensearchSinkBuilder, RequestIndexer} import org.apache.flink.streaming.api.datastream.DataStream import org.apache.http.HttpHost import org.opensearch.action.index.IndexRequest import org.opensearch.client.Requests val input: DataStream[String] = ... input.sinkTo( new OpensearchSinkBuilder[String] .setBulkFlushMaxActions(1) // Instructs the sink to emit after every element, otherwise they would be buffered .setHosts(new HttpHost("127.0.0.1", 9200, "http")) .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) => indexer.add(createIndexRequest(element))) .build()) def createIndexRequest(element: (String)): IndexRequest = { val json = Map( "data" -> element.asInstanceOf[AnyRef] ) Requests.indexRequest.index("my-index").source(mapAsJavaMap(json)) }
{{< /tab >}} {{< /tabs >}}
Note that the example only demonstrates performing a single index request for each incoming element. Generally, the OpensearchEmitter
can be used to perform requests of different types (ex., DeleteRequest
, UpdateRequest
, etc.).
Internally, each parallel instance of the Flink Opensearch Sink uses a BulkProcessor
to send action requests to the cluster. This will buffer elements before sending them in bulk to the cluster. The BulkProcessor
executes bulk requests one at a time, i.e. there will be no two concurrent flushes of the buffered actions in progress.
With Flinkās checkpointing enabled, the Flink Opensearch Sink guarantees at-least-once delivery of action requests to Opensearch clusters. It does so by waiting for all pending action requests in the BulkProcessor
at the time of checkpoints. This effectively assures that all requests before the checkpoint was triggered have been successfully acknowledged by Opensearch, before proceeding to process more records sent to the sink.
More details on checkpoints and fault tolerance are in the [fault tolerance docs]({{< ref “docs/learn-flink/fault_tolerance” >}}).
To use fault tolerant Opensearch Sinks, checkpointing of the topology needs to be enabled at the execution environment:
{{< tabs “aa0d1e93-4844-40d7-b0ec-9ec37e731a5f” >}} {{< tab “Java” >}}
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); // checkpoint every 5000 msecs
{{< /tab >}} {{< tab “Scala” >}}
val env = StreamExecutionEnvironment.getExecutionEnvironment() env.enableCheckpointing(5000) // checkpoint every 5000 msecs
{{< /tab >}} {{< /tabs >}}
Opensearch action requests may fail due to a variety of reasons, including temporarily saturated node queue capacity or malformed documents to be indexed. The Flink Opensearch Sink allows the user to retry requests by specifying a backoff-policy.
Below is an example:
{{< tabs “adb958b3-5dd5-476e-b946-ace3335628ea” >}} {{< tab “Java” >}}
DataStream<String> input = ...; input.sinkTo( new OpensearchSinkBuilder<String>() .setHosts(new HttpHost("127.0.0.1", 9200, "http")) .setEmitter( (element, context, indexer) -> indexer.add(createIndexRequest(element))) // This enables an exponential backoff retry mechanism, with a maximum of 5 retries and an initial delay of 1000 milliseconds .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000) .build());
{{< /tab >}} {{< tab “Scala” >}}
val input: DataStream[String] = ... input.sinkTo( new OpensearchSinkBuilder[String] .setHosts(new HttpHost("127.0.0.1", 9200, "http")) .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) => indexer.add(createIndexRequest(element))) // This enables an exponential backoff retry mechanism, with a maximum of 5 retries and an initial delay of 1000 milliseconds .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000) .build())
{{< /tab >}} {{< /tabs >}}
The above example will let the sink re-add requests that failed due to resource constrains (e.g. queue capacity saturation). For all other failures, such as malformed documents, the sink will fail. If no BulkFlushBackoffStrategy
(or FlushBackoffType.NONE
) is configured, the sink will fail for any kind of error.
The internal BulkProcessor
can be further configured for its behaviour on how buffered action requests are flushed, by using the following methods of the OpensearchSinkBuilder:
Configuring how temporary request errors are retried is also supported:
CONSTANT
or EXPONENTIAL
, the amount of backoff retries to attempt, the amount of delay for backoff. For constant backoff, this is simply the delay between each retry. For exponential backoff, this is the initial base delay.More information about Opensearch can be found here.
For the execution of your Flink program, it is recommended to build a so-called uber-jar (executable jar) containing all your dependencies (see [here]({{< ref “docs/dev/configuration” >}}) for further information).
Alternatively, you can put the connector‘s jar file into Flink’s lib/
folder to make it available system-wide, i.e. for all job being run.
{{< top >}}