blob: 0d044a732cbc3e407295c2da9251871aeaea098c [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import static;
import static;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
import javax.annotation.Nonnull;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.BooleanCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupIntoBatches;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.http.ConnectionClosedException;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.conn.ConnectTimeoutException;
import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
import org.apache.http.conn.ssl.TrustStrategy;
import org.apache.http.entity.BufferedHttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.message.BasicHeader;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.ssl.SSLContexts;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* Transforms for reading and writing data from/to Elasticsearch.
* <h3>Reading from Elasticsearch</h3>
* <p>{@link ElasticsearchIO#read} returns a bounded {@link PCollection
* PCollection&lt;String&gt;} representing JSON documents.
* <p>To configure the {@link ElasticsearchIO#read}, you have to provide a connection configuration
* containing the HTTP address of the instances, an index name and a type. The following example
* illustrates options for configuring the source:
* <pre>{@code
* pipeline.apply(
* ElasticsearchIO.ConnectionConfiguration.create("http://host:9200", "my-index", "my-type")
* )
* }</pre>
* <p>The connection configuration also accepts optional configuration: {@code withUsername()},
* {@code withPassword()}, {@code withApiKey()}, {@code withBearerToken()} and {@code
* withDefaultHeaders()}.
* <p>You can also specify a query on the {@code read()} using {@code withQuery()}.
* <p>There are many more configuration options which can be found by looking at the with* methods
* of {@link ElasticsearchIO.Read}
* <h3>Writing to Elasticsearch</h3>
* <p>To write documents to Elasticsearch, use {@link ElasticsearchIO#write
* ElasticsearchIO.write()}, which writes JSON documents from a {@link PCollection
* PCollection&lt;String&gt;} (which can be bounded or unbounded).
* <p>{@link ElasticsearchIO.Write} involves 2 discrete steps:
* <ul>
* <li>Converting the input PCollection of valid ES documents into Bulk API directives i.e. Should
* the input document result in: update, insert, delete, with version, with routing, etc (See
* {@link ElasticsearchIO.DocToBulk})
* <li>Batching Bulk API directives together and interfacing with an Elasticsearch cluster. (See
* {@link ElasticsearchIO.BulkIO})
* </ul>
* <p>In most cases, using {@link ElasticsearchIO#write} will be desirable. In some cases, one may
* want to use {@link ElasticsearchIO.DocToBulk} and {@link ElasticsearchIO.BulkIO} directly. Such
* cases might include:
* <ul>
* <li>Unit testing. Ensure that output Bulk API entities for a given set of inputs will produce
* an expected result, without the need for an available Elasticsearch cluster. See {@link
* ElasticsearchIO.Write#docToBulk}
* <li>Flexible options for data backup. Serialized Bulk API entities can be forked and sent to
* both Elasticsearch and a data lake.
* <li>Mirroring data to multiple clusters. Presently, mirroring data to multiple clusters would
* require duplicate computation.
* <li>Better batching with input streams in one job. A job may produce multiple "shapes" of Bulk
* API directives based on multiple input types, and then "fan-in" all serialized Bulk
* directives into a single BulkIO transform to improve batching semantics.
* <li>Decoupled jobs. Job(s) could be made to produce Bulk directives and then publish them to a
* message bus. A distinct job could consume from that message bus and solely be responsible
* for IO with the target cluster(s).
* </ul>
* <p>Note that configurations options for {@link ElasticsearchIO.Write} are a union of
* configuration options for {@link ElasticsearchIO.DocToBulk} and {@link ElasticsearchIO.BulkIO}.
* <p>To configure {@link ElasticsearchIO#write ElasticsearchIO.write()}, similar to the read, you
* have to provide a connection configuration. For instance:
* <pre>{@code
* pipeline
* .apply(...)
* .apply(ElasticsearchIO.write().withConnectionConfiguration(
* ElasticsearchIO.ConnectionConfiguration.create("http://host:9200", "my-index", "my-type")
* )
* }</pre>
* <p>There are many more configuration options which can be found by looking at the with* methods
* of {@link ElasticsearchIO.Write}
"nullness" // TODO(
public class ElasticsearchIO {
private static final List<Integer> VALID_CLUSTER_VERSIONS = Arrays.asList(5, 6, 7, 8);
private static final Set<Integer> DEPRECATED_CLUSTER_VERSIONS =
new HashSet<>(Arrays.asList(5, 6));
private static final List<String> VERSION_TYPES =
Arrays.asList("internal", "external", "external_gt", "external_gte");
private static final String VERSION_CONFLICT_ERROR = "version_conflict_engine_exception";
private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchIO.class);
public static Read read() {
// default scrollKeepalive = 5m as a majorant for un-predictable time between 2 start/read
// calls
// default batchSize to 100 as recommended by ES dev team as a safe value when dealing
// with big documents and still a good compromise for performances
return new AutoValue_ElasticsearchIO_Read.Builder()
public static DocToBulk docToBulk() {
return new AutoValue_ElasticsearchIO_DocToBulk.Builder().build();
public static BulkIO bulkIO() {
return new AutoValue_ElasticsearchIO_BulkIO.Builder()
// advised default starting batch size in ES docs
// advised default starting batch size in ES docs
.setMaxBatchSizeBytes(5L * 1024L * 1024L)
public static Write write() {
return new Write();
private ElasticsearchIO() {}
private static final ObjectMapper mapper = new ObjectMapper();
static JsonNode parseResponse(HttpEntity responseEntity) throws IOException {
return mapper.readValue(responseEntity.getContent(), JsonNode.class);
static List<Document> createWriteReport(
HttpEntity responseEntity, @Nullable Set<String> allowedErrorTypes, boolean throwWriteErrors)
throws IOException {
List<Document> responses = new ArrayList<>();
int numErrors = 0;
JsonNode searchResult = parseResponse(responseEntity);
StringBuilder errorMessages =
new StringBuilder("Error writing to Elasticsearch, some elements could not be inserted:");
JsonNode items = searchResult.path("items");
if (items.isMissingNode() || items.size() == 0) {
// This would only be expected in cases like connectivity issues or similar
LOG.warn("'items' missing from Elasticsearch response: {}", errorMessages);
// some items present in bulk might have errors, concatenate error messages and record
// which items had errors
for (JsonNode item : items) {
Document result = Document.create().withResponseItemJson(item.toString());
JsonNode error = item.findValue("error");
if (error != null) {
// N.B. An empty-string within the allowedErrorTypes Set implies all errors are allowed.
String type = error.path("type").asText();
String reason = error.path("reason").asText();
String docId = item.findValue("_id").asText();
JsonNode causedBy = error.path("caused_by"); // May not be present
String cbReason = causedBy.path("reason").asText();
String cbType = causedBy.path("type").asText();
if (allowedErrorTypes == null
|| (!allowedErrorTypes.contains(type) && !allowedErrorTypes.contains(cbType))) {
// 'error' and 'causedBy` fields are not null, and the error is not being ignored.
result = result.withHasError(true);
errorMessages.append(String.format("%nDocument id %s: %s (%s)", docId, reason, type));
if (!causedBy.isMissingNode()) {
errorMessages.append(String.format("%nCaused by: %s (%s)", cbReason, cbType));
if (numErrors > 0) {
if (throwWriteErrors) {
throw new IOException(errorMessages.toString());
return responses;
/** A POJO describing a connection configuration to Elasticsearch. */
public abstract static class ConnectionConfiguration implements Serializable {
public abstract List<String> getAddresses();
public abstract @Nullable String getUsername();
public abstract @Nullable String getPassword();
public abstract @Nullable String getApiKey();
public abstract @Nullable String getBearerToken();
public abstract @Nullable List<Header> getDefaultHeaders();
public abstract @Nullable String getKeystorePath();
public abstract @Nullable String getKeystorePassword();
public abstract @Nullable String getPathPrefix();
public abstract String getIndex();
public abstract @Nullable String getType();
public abstract @Nullable Integer getSocketTimeout();
public abstract @Nullable Integer getConnectTimeout();
public abstract boolean isTrustSelfSignedCerts();
public abstract boolean isCompressionEnabled();
abstract Builder builder();
abstract static class Builder {
abstract Builder setAddresses(List<String> addresses);
abstract Builder setUsername(String username);
abstract Builder setPassword(String password);
abstract Builder setApiKey(String apiKey);
abstract Builder setBearerToken(String bearerToken);
abstract Builder setDefaultHeaders(List<Header> defaultHeaders);
abstract Builder setKeystorePath(String keystorePath);
abstract Builder setKeystorePassword(String password);
abstract Builder setPathPrefix(String pathPrefix);
abstract Builder setIndex(String index);
abstract Builder setType(String type);
abstract Builder setSocketTimeout(Integer maxRetryTimeout);
abstract Builder setConnectTimeout(Integer connectTimeout);
abstract Builder setTrustSelfSignedCerts(boolean trustSelfSignedCerts);
abstract Builder setCompressionEnabled(boolean compressionEnabled);
abstract ConnectionConfiguration build();
* Creates a new Elasticsearch connection configuration.
* @param addresses list of addresses of Elasticsearch nodes
* @param index the index toward which the requests will be issued
* @param type the document type toward which the requests will be issued
* @return the connection configuration object
public static ConnectionConfiguration create(String[] addresses, String index, String type) {
checkArgument(addresses != null, "addresses can not be null");
checkArgument(addresses.length > 0, "addresses can not be empty");
checkArgument(index != null, "index can not be null");
checkArgument(type != null, "type can not be null");
return new AutoValue_ElasticsearchIO_ConnectionConfiguration.Builder()
* Creates a new Elasticsearch connection configuration with no default type.
* @param addresses list of addresses of Elasticsearch nodes
* @param index the index toward which the requests will be issued
* @return the connection configuration object
public static ConnectionConfiguration create(String[] addresses, String index) {
checkArgument(addresses != null, "addresses can not be null");
checkArgument(addresses.length > 0, "addresses can not be empty");
checkArgument(index != null, "index can not be null");
return new AutoValue_ElasticsearchIO_ConnectionConfiguration.Builder()
* Creates a new Elasticsearch connection configuration with no default index nor type.
* @param addresses list of addresses of Elasticsearch nodes
* @return the connection configuration object
public static ConnectionConfiguration create(String[] addresses) {
checkArgument(addresses != null, "addresses can not be null");
checkArgument(addresses.length > 0, "addresses can not be empty");
return new AutoValue_ElasticsearchIO_ConnectionConfiguration.Builder()
* Generates the API endpoint prefix based on the set values.
* <p>Based on ConnectionConfiguration constructors, we know that one of the following is true:
* <ul>
* <li>index and type are non-empty strings
* <li>index is non-empty string, type is empty string
* <li>index and type are empty string
* </ul>
* <p>Example valid endpoints therefore include:
* <ul>
* <li>/_bulk
* <li>/index_name/_bulk
* <li>/index_name/type_name/_bulk
* </ul>
public String getApiPrefix() {
StringBuilder sb = new StringBuilder();
if (!Strings.isNullOrEmpty(getIndex())) {
if (!Strings.isNullOrEmpty(getType())) {
return sb.toString();
public String getPrefixedEndpoint(String endpoint) {
return getApiPrefix() + "/" + endpoint;
public String getBulkEndPoint() {
return getPrefixedEndpoint("_bulk");
public String getSearchEndPoint() {
return getPrefixedEndpoint("_search");
public String getCountEndPoint() {
return getPrefixedEndpoint("_count");
* If Elasticsearch authentication is enabled, provide the username.
* @param username the username used to authenticate to Elasticsearch
* @return a {@link ConnectionConfiguration} describes a connection configuration to
* Elasticsearch.
public ConnectionConfiguration withUsername(String username) {
checkArgument(username != null, "username can not be null");
checkArgument(!username.isEmpty(), "username can not be empty");
return builder().setUsername(username).build();
* If Elasticsearch authentication is enabled, provide the password.
* @param password the password used to authenticate to Elasticsearch
* @return a {@link ConnectionConfiguration} describes a connection configuration to
* Elasticsearch.
public ConnectionConfiguration withPassword(String password) {
checkArgument(password != null, "password can not be null");
checkArgument(!password.isEmpty(), "password can not be empty");
return builder().setPassword(password).build();
* If Elasticsearch is not running at the root path, e.g. 'host:9200/path/to/index', specify
* path prefix 'path/to'.
* @param pathPrefix the path prefix used in Elasticsearch instance.
* @return a {@link ConnectionConfiguration} describes a connection configuration to
* Elasticsearch.
public ConnectionConfiguration withPathPrefix(String pathPrefix) {
checkArgument(pathPrefix != null, "pathPrefix can not be null");
checkArgument(!pathPrefix.isEmpty(), "pathPrefix can not be empty");
return builder().setPathPrefix(pathPrefix).build();
* If Elasticsearch authentication is enabled, provide an API key. Be aware that you can only
* use one of {@Code withApiToken()}, {@code withBearerToken()} and {@code withDefaultHeaders}
* at the same time, as they (potentially) use the same header.
* @param apiKey the API key used to authenticate to Elasticsearch
* @return a {@link ConnectionConfiguration} describes a connection configuration to
* Elasticsearch.
public ConnectionConfiguration withApiKey(String apiKey) {
checkArgument(!Strings.isNullOrEmpty(apiKey), "apiKey can not be null or empty");
checkArgument(getBearerToken() == null, "apiKey can not be combined with bearerToken");
checkArgument(getDefaultHeaders() == null, "apiKey can not be combined with defaultHeaders");
return builder().setApiKey(apiKey).build();
* If Elasticsearch authentication is enabled, provide a bearer token. Be aware that you can
* only use one of {@Code withApiToken()}, {@code withBearerToken()} and {@code
* withDefaultHeaders} at the same time, as they (potentially) use the same header.
* @param bearerToken the bearer token used to authenticate to Elasticsearch
* @return a {@link ConnectionConfiguration} describes a connection configuration to
* Elasticsearch.
public ConnectionConfiguration withBearerToken(String bearerToken) {
checkArgument(!Strings.isNullOrEmpty(bearerToken), "bearerToken can not be null or empty");
checkArgument(getApiKey() == null, "bearerToken can not be combined with apiKey");
getDefaultHeaders() == null, "bearerToken can not be combined with defaultHeaders");
return builder().setBearerToken(bearerToken).build();
* For authentication or custom requirements, provide a set if default headers for the client.
* Be aware that you can only use one of {@code withApiToken()}, {@code withBearerToken()} and
* {@code withDefaultHeaders} at the same time, as they (potentially) use the same header.
* <p>An example of where this could be useful is if the client needs to use short-lived
* credentials that need to be renewed on a certain interval. To implement that, a user could
* implement a custom header that tracks the renewal period, for example:
* <pre>
* {@code class OAuthTokenHeader extends BasicHeader {
* OAuthToken accessToken;
* ...
* @Override
* public String getValue() {
* if (accessToken.isExpired()) {
* accessToken.renew();
* }
* return String.format("Bearer %s", accessToken.getToken());
* }
* }}
* </pre>
* @param defaultHeaders the headers to add to outgoing requests
* @return a {@link ConnectionConfiguration} describes a connection configuration to
* Elasticsearch.
public ConnectionConfiguration withDefaultHeaders(Header[] defaultHeaders) {
checkArgument(defaultHeaders != null, "defaultHeaders can not be null");
checkArgument(defaultHeaders.length > 0, "defaultHeaders can not be empty");
checkArgument(getApiKey() == null, "defaultHeaders can not be combined with apiKey");
getBearerToken() == null, "defaultHeaders can not be combined with bearerToken");
return builder().setDefaultHeaders(Arrays.asList(defaultHeaders)).build();
* If Elasticsearch uses SSL/TLS with mutual authentication (via shield), provide the keystore
* containing the client key.
* @param keystorePath the location of the keystore containing the client key.
* @return a {@link ConnectionConfiguration} describes a connection configuration to
* Elasticsearch.
public ConnectionConfiguration withKeystorePath(String keystorePath) {
checkArgument(keystorePath != null, "keystorePath can not be null");
checkArgument(!keystorePath.isEmpty(), "keystorePath can not be empty");
return builder().setKeystorePath(keystorePath).build();
* If Elasticsearch uses SSL/TLS with mutual authentication (via shield), provide the password
* to open the client keystore.
* @param keystorePassword the password of the client keystore.
* @return a {@link ConnectionConfiguration} describes a connection configuration to
* Elasticsearch.
public ConnectionConfiguration withKeystorePassword(String keystorePassword) {
checkArgument(keystorePassword != null, "keystorePassword can not be null");
return builder().setKeystorePassword(keystorePassword).build();
* If Elasticsearch uses SSL/TLS then configure whether to trust self signed certs or not. The
* default is false.
* @param trustSelfSignedCerts Whether to trust self signed certs
* @return a {@link ConnectionConfiguration} describes a connection configuration to
* Elasticsearch.
public ConnectionConfiguration withTrustSelfSignedCerts(boolean trustSelfSignedCerts) {
return builder().setTrustSelfSignedCerts(trustSelfSignedCerts).build();
* Configure whether the REST client should compress requests using gzip content encoding and
* add the "Accept-Encoding: gzip". The default is true.
* @param compressionEnabled Whether to compress requests using gzip content encoding and add
* the "Accept-Encoding: gzip"
* @return a {@link ConnectionConfiguration} describes a connection configuration to
* Elasticsearch.
public ConnectionConfiguration withCompressionEnabled(boolean compressionEnabled) {
return builder().setCompressionEnabled(compressionEnabled).build();
* If set, overwrites the default max retry timeout (30000ms) in the Elastic {@link RestClient}
* and the default socket timeout (30000ms) in the {@link RequestConfig} of the Elastic {@link
* RestClient}.
* @param socketTimeout the socket and retry timeout in millis.
* @return a {@link ConnectionConfiguration} describes a connection configuration to
* Elasticsearch.
public ConnectionConfiguration withSocketTimeout(Integer socketTimeout) {
checkArgument(socketTimeout != null, "socketTimeout can not be null");
return builder().setSocketTimeout(socketTimeout).build();
* If set, overwrites the default connect timeout (1000ms) in the {@link RequestConfig} of the
* Elastic {@link RestClient}.
* @param connectTimeout the socket and retry timeout in millis.
* @return a {@link ConnectionConfiguration} describes a connection configuration to
* Elasticsearch.
public ConnectionConfiguration withConnectTimeout(Integer connectTimeout) {
checkArgument(connectTimeout != null, "connectTimeout can not be null");
return builder().setConnectTimeout(connectTimeout).build();
private void populateDisplayData(DisplayData.Builder builder) {
builder.add(DisplayData.item("address", getAddresses().toString()));
builder.add(DisplayData.item("index", getIndex()));
builder.addIfNotNull(DisplayData.item("type", getType()));
builder.addIfNotNull(DisplayData.item("username", getUsername()));
builder.addIfNotNull(DisplayData.item("keystore.path", getKeystorePath()));
builder.addIfNotNull(DisplayData.item("socketTimeout", getSocketTimeout()));
builder.addIfNotNull(DisplayData.item("connectTimeout", getConnectTimeout()));
builder.addIfNotNull(DisplayData.item("trustSelfSignedCerts", isTrustSelfSignedCerts()));
builder.addIfNotNull(DisplayData.item("compressionEnabled", isCompressionEnabled()));
private SSLContext getSSLContext() throws IOException {
if (getKeystorePath() != null && !getKeystorePath().isEmpty()) {
try {
KeyStore keyStore = KeyStore.getInstance("jks");
try (InputStream is = new FileInputStream(new File(getKeystorePath()))) {
String keystorePassword = getKeystorePassword();
keyStore.load(is, (keystorePassword == null) ? null : keystorePassword.toCharArray());
final TrustStrategy trustStrategy =
isTrustSelfSignedCerts() ? new TrustSelfSignedStrategy() : null;
return SSLContexts.custom().loadTrustMaterial(keyStore, trustStrategy).build();
} catch (Exception e) {
throw new IOException("Can't load the client certificate from the keystore", e);
return null;
RestClient createClient() throws IOException {
HttpHost[] hosts = new HttpHost[getAddresses().size()];
int i = 0;
for (String address : getAddresses()) {
URL url = new URL(address);
hosts[i] = new HttpHost(url.getHost(), url.getPort(), url.getProtocol());
RestClientBuilder restClientBuilder = RestClient.builder(hosts);
if (getPathPrefix() != null) {
final SSLContext sslContext = getSSLContext();
if (getApiKey() != null) {
new Header[] {new BasicHeader("Authorization", "ApiKey " + getApiKey())});
if (getBearerToken() != null) {
new Header[] {new BasicHeader("Authorization", "Bearer " + getBearerToken())});
if (getDefaultHeaders() != null) {
Header[] headerList = new Header[getDefaultHeaders().size()];
if (isCompressionEnabled()) {
httpClientBuilder -> {
if (getUsername() != null) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
AuthScope.ANY, new UsernamePasswordCredentials(getUsername(), getPassword()));
if (sslContext != null) {
final SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sslContext);
return httpClientBuilder;
new RestClientBuilder.RequestConfigCallback() {
public RequestConfig.Builder customizeRequestConfig(
RequestConfig.Builder requestConfigBuilder) {
if (getConnectTimeout() != null) {
if (getSocketTimeout() != null) {
return requestConfigBuilder;
/** A {@link PTransform} reading data from Elasticsearch. */
public abstract static class Read extends PTransform<PBegin, PCollection<String>> {
private static final long MAX_BATCH_SIZE = 10000L;
private static final String SEARCH_AFTER_DEFAULT_SORT_PROPERTY = "@timestamp";
private static final String SEARCH_AFTER_SORT_TEMPLATE =
"\"sort\" : {"
+ " \"%s\" : {"
+ " \"order\" : \"asc\", "
+ " \"format\" : \"strict_date_optional_time_nanos\""
+ " }"
+ "}";
private static final String SEARCH_AFTER_DEFAULT_SORT =
abstract @Nullable ConnectionConfiguration getConnectionConfiguration();
abstract @Nullable ValueProvider<String> getQuery();
abstract boolean isWithMetadata();
abstract String getScrollKeepalive();
abstract long getBatchSize();
abstract boolean getUsePITSearch();
abstract @Nullable String getPITSortConfig();
abstract @Nullable String getPITSortTimestampProperty();
abstract Builder builder();
abstract static class Builder {
abstract Builder setConnectionConfiguration(ConnectionConfiguration connectionConfiguration);
abstract Builder setQuery(ValueProvider<String> query);
abstract Builder setWithMetadata(boolean withMetadata);
abstract Builder setScrollKeepalive(String scrollKeepalive);
abstract Builder setBatchSize(long batchSize);
abstract Builder setUsePITSearch(boolean usePIT);
abstract Builder setPITSortConfig(String pitConfig);
abstract Builder setPITSortTimestampProperty(String pitTimestampProperty);
abstract Read build();
* Provide the Elasticsearch connection configuration object.
* @param connectionConfiguration a {@link ConnectionConfiguration} describes a connection
* configuration to Elasticsearch.
* @return a {@link PTransform} reading data from Elasticsearch.
public Read withConnectionConfiguration(ConnectionConfiguration connectionConfiguration) {
checkArgument(connectionConfiguration != null, "connectionConfiguration can not be null");
return builder().setConnectionConfiguration(connectionConfiguration).build();
* Provide a query used while reading from Elasticsearch.
* @param query the query. See <a
* href="">Query
* DSL</a>
* @return a {@link PTransform} reading data from Elasticsearch.
public Read withQuery(String query) {
checkArgument(query != null, "query can not be null");
checkArgument(!query.isEmpty(), "query can not be empty");
return withQuery(ValueProvider.StaticValueProvider.of(query));
* Provide a {@link ValueProvider} that provides the query used while reading from
* Elasticsearch. This is useful for cases when the query must be dynamic.
* @param query the query. See <a
* href="">Query
* DSL</a>
* @return a {@link PTransform} reading data from Elasticsearch.
public Read withQuery(ValueProvider<String> query) {
checkArgument(query != null, "query can not be null");
return builder().setQuery(query).build();
* Include metadata in result json documents. Document source will be under json node _source.
* @return a {@link PTransform} reading data from Elasticsearch.
public Read withMetadata() {
return builder().setWithMetadata(true).build();
* Provide a scroll keepalive. See <a
* href="">scroll
* API</a> Default is "5m". Change this only if you get "No search context found" errors. When
* configuring the read to use Point In Time (PIT) search this configuration is used to set the
* PIT keep alive.
* @param scrollKeepalive keepalive duration of the scroll
* @return a {@link PTransform} reading data from Elasticsearch.
public Read withScrollKeepalive(String scrollKeepalive) {
checkArgument(scrollKeepalive != null, "scrollKeepalive can not be null");
checkArgument(!"0m".equals(scrollKeepalive), "scrollKeepalive can not be 0m");
return builder().setScrollKeepalive(scrollKeepalive).build();
* Provide a size for the scroll read. See <a
* href="">
* scroll API</a> Default is 100. Maximum is 10 000. If documents are small, increasing batch
* size might improve read performance. If documents are big, you might need to decrease
* batchSize
* @param batchSize number of documents read in each scroll read
* @return a {@link PTransform} reading data from Elasticsearch.
public Read withBatchSize(long batchSize) {
batchSize > 0 && batchSize <= MAX_BATCH_SIZE,
"batchSize must be > 0 and <= %s, but was: %s",
return builder().setBatchSize(batchSize).build();
* Configures the source to user Point In Time search iteration while reading data from
* Elasticsearch. See <a
* href="">
* Point in time search</a>, using default settings. This iteration mode for searches does not
* have the same size constrains the Scroll API have (slice counts, batch size or how deep the
* iteration is). By default this iteration mode will use a {@code @timestamp} named property on
* the indexed documents to consistently retrieve the data when failures occur on an specific
* read work.
* @return a {@link PTransform} reading data from Elasticsearch.
public Read withPointInTimeSearch() {
return builder()
* Similar to {@link #withPointInTimeSearch() the default PIT search} but setting an existing
* timestamp based property name which Elasticsearch will use to sort for the results.
* @param timestampSortProperty a property name found in the read documents containing a
* timestamp-like value.
* @return a {@link PTransform} reading data from Elasticsearch.
public Read withPointInTimeSearchAndTimestampSortProperty(String timestampSortProperty) {
return builder()
.setPITSortConfig(String.format(SEARCH_AFTER_SORT_TEMPLATE, timestampSortProperty))
* Similar to {@link #withPointInTimeSearch() the default PIT search} but setting a specific
* sorting configuration which Elasticsearch will use to sort for the results.
* @param sortConfiguration the full sorting configuration to be sent to Elasticsearch while
* iterating on the results.
* @return a {@link PTransform} reading data from Elasticsearch.
public Read withPointInTimeSearchAndSortConfiguration(String sortConfiguration) {
return builder()
public PCollection<String> expand(PBegin input) {
ConnectionConfiguration connectionConfiguration = getConnectionConfiguration();
checkState(connectionConfiguration != null, "withConnectionConfiguration() is required");
return input.apply( BoundedElasticsearchSource(this, null, null)));
public void populateDisplayData(DisplayData.Builder builder) {
builder.addIfNotNull(DisplayData.item("query", getQuery()));
builder.addIfNotNull(DisplayData.item("withMetadata", isWithMetadata()));
builder.addIfNotNull(DisplayData.item("batchSize", getBatchSize()));
builder.addIfNotNull(DisplayData.item("scrollKeepalive", getScrollKeepalive()));
builder.addIfNotNull(DisplayData.item("usePointInTimeSearch", getUsePITSearch()));
void validatePITConfiguration(int backendVersion) {
getUsePITSearch() && backendVersion >= 8,
"Point in time searches are supported for clusters with version 8 and higher.");
/** A {@link BoundedSource} reading from Elasticsearch. */
public static class BoundedElasticsearchSource extends BoundedSource<String> {
private int backendVersion;
private final Read spec;
private final @Nullable Integer numSlices;
private final @Nullable Integer sliceId;
private @Nullable Long estimatedByteSize;
// constructor used in split() when we know the backend version
private BoundedElasticsearchSource(
Read spec,
@Nullable Integer numSlices,
@Nullable Integer sliceId,
@Nullable Long estimatedByteSize,
int backendVersion) {
this.backendVersion = backendVersion;
this.spec = spec;
this.numSlices = numSlices;
this.estimatedByteSize = estimatedByteSize;
this.sliceId = sliceId;
BoundedElasticsearchSource(Read spec, @Nullable Integer numSlices, @Nullable Integer sliceId) {
this.spec = spec;
this.numSlices = numSlices;
this.sliceId = sliceId;
public List<? extends BoundedSource<String>> split(
long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
ConnectionConfiguration connectionConfiguration = spec.getConnectionConfiguration();
this.backendVersion = getBackendVersion(connectionConfiguration);
List<BoundedElasticsearchSource> sources = new ArrayList<>();
long indexSize = getEstimatedSizeBytes(options);
float nbBundlesFloat = (float) indexSize / desiredBundleSizeBytes;
int nbBundles = (int) Math.ceil(nbBundlesFloat);
// ES slice api imposes that the number of slices is <= 1024 even if it can be overloaded
if (nbBundles > 1024) {
nbBundles = 1024;
// split the index into nbBundles chunks of desiredBundleSizeBytes by creating
// nbBundles sources each reading a slice of the index
// (see
// the slice API allows to split the ES shards
// to have bundles closer to desiredBundleSizeBytes
for (int i = 0; i < nbBundles; i++) {
long estimatedByteSizeForBundle = getEstimatedSizeBytes(options) / nbBundles;
new BoundedElasticsearchSource(
spec, nbBundles, i, estimatedByteSizeForBundle, backendVersion));
return sources;
public long getEstimatedSizeBytes(PipelineOptions options) throws IOException {
if (estimatedByteSize != null) {
return estimatedByteSize;
final ConnectionConfiguration connectionConfiguration = spec.getConnectionConfiguration();
JsonNode statsJson = getStats(connectionConfiguration);
JsonNode indexStats = statsJson.path("_all").path("primaries");
long indexSize = indexStats.path("store").path("size_in_bytes").asLong();
LOG.debug("estimate source byte size: total index size {}", indexSize);
String query = spec.getQuery() != null ? spec.getQuery().get() : null;
if (query == null || query.isEmpty()) { // return index size if no query
estimatedByteSize = indexSize;
return estimatedByteSize;
long totalCount = indexStats.path("docs").path("count").asLong();
LOG.debug("estimate source byte size: total document count {}", totalCount);
if (totalCount == 0) { // The min size is 1, because DirectRunner does not like 0
estimatedByteSize = 1L;
return estimatedByteSize;
String endPoint = connectionConfiguration.getCountEndPoint();
try (RestClient restClient = connectionConfiguration.createClient()) {
long count = queryCount(restClient, endPoint, query);
LOG.debug("estimate source byte size: query document count {}", count);
if (count == 0) {
estimatedByteSize = 1L;
} else {
// We estimate the average byte size for each document is (index/totalCount)
// and then multiply the document count in the index
estimatedByteSize = (indexSize / totalCount) * count;
return estimatedByteSize;
private long queryCount(
@Nonnull RestClient restClient, @Nonnull String endPoint, @Nonnull String query)
throws IOException {
Request request = new Request("GET", endPoint);
request.setEntity(new NStringEntity(query, ContentType.APPLICATION_JSON));
JsonNode searchResult = parseResponse(restClient.performRequest(request).getEntity());
return searchResult.path("count").asLong();
static long estimateIndexSize(ConnectionConfiguration connectionConfiguration)
throws IOException {
// we use indices stats API to estimate size and list the shards
// (
// as Elasticsearch 2.x doesn't not support any way to do parallel read inside a shard
// the estimated size bytes is not really used in the split into bundles.
// However, we implement this method anyway as the runners can use it.
// NB: Elasticsearch 5.x+ now provides the slice API.
// (
// #sliced-scroll)
JsonNode statsJson = getStats(connectionConfiguration);
JsonNode indexStats = statsJson.path("_all").path("primaries");
JsonNode store = indexStats.path("store");
return store.path("size_in_bytes").asLong();
public void populateDisplayData(DisplayData.Builder builder) {
builder.addIfNotNull(DisplayData.item("numSlices", numSlices));
builder.addIfNotNull(DisplayData.item("sliceId", sliceId));
public BoundedReader<String> createReader(PipelineOptions options) {
if (!spec.getUsePITSearch()) {
return new BoundedElasticsearchScrollReader(this);
} else {
return new BoundedElasticsearchPITReader(this);
public void validate() {
public Coder<String> getOutputCoder() {
return StringUtf8Coder.of();
private static JsonNode getStats(ConnectionConfiguration connectionConfiguration)
throws IOException {
HashMap<String, String> params = new HashMap<>();
String endpoint = String.format("/%s/_stats", connectionConfiguration.getIndex());
try (RestClient restClient = connectionConfiguration.createClient()) {
Request request = new Request("GET", endpoint);
return parseResponse(restClient.performRequest(request).getEntity());
abstract static class BoundedElasticsearchReader extends BoundedSource.BoundedReader<String> {
private static final Counter READ =
Metrics.counter(BoundedElasticsearchScrollReader.class, "es-read-document-count");
private static final String MATCH_ALL_QUERY = "{\"query\": { \"match_all\": {} }}";
protected final BoundedElasticsearchSource source;
protected RestClient restClient;
protected JsonNode current;
protected ListIterator<JsonNode> batchIterator;
protected String iteratorId;
protected BoundedElasticsearchReader(BoundedElasticsearchSource source) {
this.source = source;
protected abstract Request createStartRequest();
protected abstract Request createAdvanceRequest();
protected abstract Request createCloseRequest();
protected abstract boolean processResult(JsonNode searchResult) throws IOException;
protected abstract void updateIteratorId(JsonNode searchResult);
protected String createBaseQuery() {
String query = source.spec.getQuery() != null ? source.spec.getQuery().get() : null;
if (query == null) {
query = BoundedElasticsearchReader.MATCH_ALL_QUERY;
return query;
public boolean start() throws IOException {
restClient = source.spec.getConnectionConfiguration().createClient();
Response response = restClient.performRequest(createStartRequest());
JsonNode searchResult = parseResponse(response.getEntity());
return processResult(searchResult);
public boolean advance() throws IOException {
if (batchIterator.hasNext()) {
current =;
return true;
} else {
return performAdvance();
protected boolean performAdvance() throws IOException {
Response response = restClient.performRequest(createAdvanceRequest());
JsonNode searchResult = parseResponse(response.getEntity());
return processResult(searchResult);
protected boolean readNextBatchAndReturnFirstDocument(JsonNode searchResult) {
// stop if no more data
JsonNode hits = searchResult.path("hits").path("hits");
if (hits.size() == 0) {
current = null;
batchIterator = null;
return false;
// list behind iterator is empty
List<JsonNode> batch = new ArrayList<>();
for (JsonNode hit : hits) {
batchIterator = batch.listIterator();
current =;
return true;
public String getCurrent() throws NoSuchElementException {
if (current == null) {
throw new NoSuchElementException();
boolean withMetadata = source.spec.isWithMetadata();
return withMetadata ? current.toString() : current.path("_source").toString();
public void close() throws IOException {
Request closeRequest = createCloseRequest();
// clear the selected iterator
try {
} finally {
if (restClient != null) {
public BoundedSource<String> getCurrentSource() {
return source;
static class BoundedElasticsearchScrollReader extends BoundedElasticsearchReader {
public BoundedElasticsearchScrollReader(BoundedElasticsearchSource source) {
protected Request createStartRequest() {
String query = createBaseQuery();
if ((source.backendVersion >= 5) && source.numSlices != null && source.numSlices > 1) {
// if there is more than one slice, add the slice to the user query
String sliceQuery =
String.format("\"slice\": {\"id\": %s,\"max\": %s}", source.sliceId, source.numSlices);
query = query.replaceFirst("\\{", "{" + sliceQuery + ",");
String endPoint = source.spec.getConnectionConfiguration().getSearchEndPoint();
Map<String, String> params = new HashMap<>();
params.put("scroll", source.spec.getScrollKeepalive());
HttpEntity queryEntity = new NStringEntity(query, ContentType.APPLICATION_JSON);
Request request = new Request("GET", endPoint);
return request;
protected Request createAdvanceRequest() {
String requestBody =
"{\"scroll\" : \"%s\",\"scroll_id\" : \"%s\"}",
source.spec.getScrollKeepalive(), iteratorId);
HttpEntity scrollEntity = new NStringEntity(requestBody, ContentType.APPLICATION_JSON);
Request request = new Request("GET", "/_search/scroll");
return request;
protected Request createCloseRequest() {
String requestBody = String.format("{\"scroll_id\" : [\"%s\"]}", iteratorId);
HttpEntity entity = new NStringEntity(requestBody, ContentType.APPLICATION_JSON);
Request request = new Request("DELETE", "/_search/scroll");
return request;
protected boolean processResult(JsonNode searchResult) throws IOException {
return readNextBatchAndReturnFirstDocument(searchResult);
protected void updateIteratorId(JsonNode searchResult) {
iteratorId = searchResult.path("_scroll_id").asText();
static class BoundedElasticsearchPITReader extends BoundedElasticsearchReader {
private String searchAfterProperty = "";
public BoundedElasticsearchPITReader(BoundedElasticsearchSource source) {
private String modifyQueryForPIT(String originalQuery) {
String trimmed = originalQuery.trim();
if (trimmed.startsWith("{") && trimmed.endsWith("}")) {
return trimmed.substring(1, trimmed.length() - 1);
return originalQuery;
protected String createBaseQuery() {
return modifyQueryForPIT(super.createBaseQuery()) + ", " + source.spec.getPITSortConfig();
protected Request createStartRequest() {
String endPoint =
String.format("/%s/_pit", source.spec.getConnectionConfiguration().getIndex());
Map<String, String> params = new HashMap<>();
params.put("keep_alive", source.spec.getScrollKeepalive());
Request request = new Request("POST", endPoint);
return request;
String searchAfter() {
if (searchAfterProperty.isEmpty()) {
return "";
return String.format("\"search_after\" : %s,", searchAfterProperty);
protected Request createAdvanceRequest() {
// if there is more than one slice, add the slice to the user query
String sliceQuery =
source.numSlices > 1
? String.format(
"\"slice\" : {\"id\" : %s, \"max\" : %s},", source.sliceId, source.numSlices)
: "";
String requestBody =
+ " %s"
+ " \"size\" : %d,"
+ " %s"
+ " %s,"
+ " \"pit\": {"
+ " \"id\": \"%s\""
+ " }"
+ "}",
searchAfter(), source.spec.getBatchSize(), sliceQuery, createBaseQuery(), iteratorId);
HttpEntity pitSearchEntity = new NStringEntity(requestBody, ContentType.APPLICATION_JSON);
Request request = new Request("POST", "/_search");
return request;
protected Request createCloseRequest() {
String requestBody = String.format("{\"id\" : \"%s\"}", iteratorId);
HttpEntity entity = new NStringEntity(requestBody, ContentType.APPLICATION_JSON);
Request request = new Request("DELETE", "/_pit");
return request;
String extractSearchAfterFromDocument(JsonNode document) {
return document.path("sort").toString();
public boolean advance() throws IOException {
if (batchIterator.hasNext()) {
current =;
searchAfterProperty = extractSearchAfterFromDocument(current);
return true;
} else {
return performAdvance();
protected boolean processResult(JsonNode searchResult) throws IOException {
JsonNode hits = searchResult.path("hits");
if (hits == null || hits.isMissingNode()) {
// after creating the PIT we need to make the first request to comply with Reader API and
// try to get a first result or declare the source empty
return performAdvance();
JsonNode resultArray = hits.path("hits");
// check if results are empty
if (resultArray == null || resultArray.isEmpty()) {
return false;
// we already opened the PIT search and are processing the search results
boolean wasDocumentRead = readNextBatchAndReturnFirstDocument(searchResult);
if (wasDocumentRead && current != null) {
searchAfterProperty = extractSearchAfterFromDocument(current);
return wasDocumentRead;
protected void updateIteratorId(JsonNode searchResult) {
iteratorId = extractPITId(searchResult);
String extractPITId(JsonNode searchResult) {
String maybeId = searchResult.path("id").asText();
// check if this is the first request
if (maybeId != null && !maybeId.isEmpty()) {
return maybeId;
} else {
return searchResult.path("pit_id").asText();
* A POJO encapsulating a configuration for retry behavior when issuing requests to ES. A retry
* will be attempted until the maxAttempts or maxDuration is exceeded, whichever comes first, for
* 429 TOO_MANY_REQUESTS error.
public abstract static class RetryConfiguration implements Serializable {
static final RetryPredicate DEFAULT_RETRY_PREDICATE = new DefaultRetryPredicate();
abstract int getMaxAttempts();
abstract Duration getMaxDuration();
abstract RetryPredicate getRetryPredicate();
abstract Builder builder();
abstract static class Builder {
abstract ElasticsearchIO.RetryConfiguration.Builder setMaxAttempts(int maxAttempts);
abstract ElasticsearchIO.RetryConfiguration.Builder setMaxDuration(Duration maxDuration);
abstract ElasticsearchIO.RetryConfiguration.Builder setRetryPredicate(
RetryPredicate retryPredicate);
abstract ElasticsearchIO.RetryConfiguration build();
* Creates RetryConfiguration for {@link ElasticsearchIO} with provided maxAttempts,
* maxDurations and exponential backoff based retries.
* @param maxAttempts max number of attempts.
* @param maxDuration maximum duration for retries.
* @return {@link RetryConfiguration} object with provided settings.
public static RetryConfiguration create(int maxAttempts, Duration maxDuration) {
checkArgument(maxAttempts > 0, "maxAttempts must be greater than 0");
maxDuration != null && maxDuration.isLongerThan(Duration.ZERO),
"maxDuration must be greater than 0");
return new AutoValue_ElasticsearchIO_RetryConfiguration.Builder()
// Exposed only to allow tests to easily simulate server errors
RetryConfiguration withRetryPredicate(RetryPredicate predicate) {
checkArgument(predicate != null, "predicate must be provided");
return builder().setRetryPredicate(predicate).build();
* An interface used to control if we retry the Elasticsearch call when a {@link Response} is
* obtained. If {@link RetryPredicate#test(Object)} returns true, {@link Write} tries to resend
* the requests to the Elasticsearch server if the {@link RetryConfiguration} permits it.
interface RetryPredicate extends Predicate<HttpEntity>, Serializable {}
* This is the default predicate used to test if a failed ES operation should be retried. A
* retry will be attempted until the maxAttempts or maxDuration is exceeded, whichever comes
* first, for TOO_MANY_REQUESTS(429) error.
static class DefaultRetryPredicate implements RetryPredicate {
private int errorCode;
DefaultRetryPredicate(int code) {
this.errorCode = code;
DefaultRetryPredicate() {
/** Returns true if the response has the error code for any mutation. */
private static boolean errorCodePresent(HttpEntity responseEntity, int errorCode) {
try {
JsonNode json = parseResponse(responseEntity);
if (json.path("errors").asBoolean()) {
for (JsonNode item : json.path("items")) {
if (item.findValue("status").asInt() == errorCode) {
return true;
} catch (IOException e) {
LOG.warn("Could not extract error codes from responseEntity {}", responseEntity);
return false;
public boolean test(HttpEntity responseEntity) {
return errorCodePresent(responseEntity, errorCode);
/** A {@link PTransform} converting docs to their Bulk API counterparts. */
public abstract static class DocToBulk
extends PTransform<PCollection<String>, PCollection<Document>> {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final int DEFAULT_RETRY_ON_CONFLICT = 5; // race conditions on updates
static {
SimpleModule module = new SimpleModule();
module.addSerializer(DocumentMetadata.class, new DocumentMetadataSerializer());
abstract @Nullable ConnectionConfiguration getConnectionConfiguration();
abstract Write.@Nullable FieldValueExtractFn getIdFn();
abstract Write.@Nullable FieldValueExtractFn getIndexFn();
abstract Write.@Nullable FieldValueExtractFn getRoutingFn();
abstract Write.@Nullable FieldValueExtractFn getTypeFn();
abstract Write.@Nullable FieldValueExtractFn getDocVersionFn();
abstract @Nullable String getDocVersionType();
abstract @Nullable String getUpsertScript();
abstract @Nullable Boolean getUsePartialUpdate();
abstract @Nullable Boolean getAppendOnly();
abstract Write.@Nullable BooleanFieldValueExtractFn getIsDeleteFn();
abstract @Nullable Integer getBackendVersion();
abstract Builder builder();
abstract static class Builder {
abstract Builder setConnectionConfiguration(ConnectionConfiguration connectionConfiguration);
abstract Builder setIdFn(Write.FieldValueExtractFn idFunction);
abstract Builder setIndexFn(Write.FieldValueExtractFn indexFn);
abstract Builder setRoutingFn(Write.FieldValueExtractFn routingFunction);
abstract Builder setTypeFn(Write.FieldValueExtractFn typeFn);
abstract Builder setDocVersionFn(Write.FieldValueExtractFn docVersionFn);
abstract Builder setDocVersionType(String docVersionType);
abstract Builder setIsDeleteFn(Write.BooleanFieldValueExtractFn isDeleteFn);
abstract Builder setUsePartialUpdate(Boolean usePartialUpdate);
abstract Builder setAppendOnly(Boolean appendOnly);
abstract Builder setUpsertScript(String source);
abstract Builder setBackendVersion(Integer assumedBackendVersion);
abstract DocToBulk build();
* Provide the Elasticsearch connection configuration object. Only required if
* withBackendVersion was not used i.e. getBackendVersion() returns null.
* @param connectionConfiguration the Elasticsearch {@link ConnectionConfiguration} object
* @return the {@link DocToBulk} with connection configuration set
public DocToBulk withConnectionConfiguration(ConnectionConfiguration connectionConfiguration) {
checkArgument(connectionConfiguration != null, "connectionConfiguration can not be null");
return builder().setConnectionConfiguration(connectionConfiguration).build();
* Provide a function to extract the id from the document. This id will be used as the document
* id in Elasticsearch. Should the function throw an Exception then the batch will fail and the
* exception propagated.
* @param idFn to extract the document ID
* @return the {@link DocToBulk} with the function set
public DocToBulk withIdFn(Write.FieldValueExtractFn idFn) {
checkArgument(idFn != null, "idFn must not be null");
return builder().setIdFn(idFn).build();
* Provide a function to extract the target index from the document allowing for dynamic
* document routing. Should the function throw an Exception then the batch will fail and the
* exception propagated.
* @param indexFn to extract the destination index from
* @return the {@link DocToBulk} with the function set
public DocToBulk withIndexFn(Write.FieldValueExtractFn indexFn) {
checkArgument(indexFn != null, "indexFn must not be null");
return builder().setIndexFn(indexFn).build();
* Provide a function to extract the target routing from the document allowing for dynamic
* document routing. Should the function throw an Exception then the batch will fail and the
* exception propagated.
* @param routingFn to extract the destination index from
* @return the {@link DocToBulk} with the function set
public DocToBulk withRoutingFn(Write.FieldValueExtractFn routingFn) {
checkArgument(routingFn != null, "routingFn must not be null");
return builder().setRoutingFn(routingFn).build();
* Provide a function to extract the target type from the document allowing for dynamic document
* routing. Should the function throw an Exception then the batch will fail and the exception
* propagated. Users are encouraged to consider carefully if multipe types are a sensible model
* <a
* href="">as
* discussed in this blog</a>.
* @param typeFn to extract the destination index from
* @return the {@link DocToBulk} with the function set
public DocToBulk withTypeFn(Write.FieldValueExtractFn typeFn) {
checkArgument(typeFn != null, "typeFn must not be null");
return builder().setTypeFn(typeFn).build();
* Provide an instruction to control whether partial updates or inserts (default) are issued to
* Elasticsearch.
* @param usePartialUpdate set to true to issue partial updates
* @return the {@link DocToBulk} with the partial update control set
public DocToBulk withUsePartialUpdate(boolean usePartialUpdate) {
return builder().setUsePartialUpdate(usePartialUpdate).build();
* Provide an instruction to control whether the target index should be considered append-only.
* For append-only indexes and/or data streams, only {@code create} operations will be issued,
* instead of {@code index}, which is the default.
* <p>{@code create} fails if a document with the same ID already exists in the target, {@code
* index} adds or replaces a document as necessary. If no ID is provided, both operations are
* equivalent, unless you are writing to a <a
* href="">data
* stream</a>. Data streams only support the {@code create} operation. For more information see
* the <a
* href="">Elasticsearch
* documentation</a>
* <p>Updates and deletions are not allowed, so related options will be ignored.
* <p>When the documents contain
* @param appendOnly set to true to allow only document appending
* @return the {@link DocToBulk} with the-append only control set
public DocToBulk withAppendOnly(boolean appendOnly) {
return builder().setAppendOnly(appendOnly).build();
* Whether to use scripted updates and what script to use.
* @param source set to the value of the script source, painless lang
* @return the {@link DocToBulk} with the scripted updates set
public DocToBulk withUpsertScript(String source) {
if (getBackendVersion() == null || getBackendVersion() == 2) {
LOG.warn("Painless scripts are not supported on Elasticsearch clusters before version 5.0");
return builder().setUsePartialUpdate(false).setUpsertScript(source).build();
* Provide a function to extract the doc version from the document. This version number will be
* used as the document version in Elasticsearch. Should the function throw an Exception then
* the batch will fail and the exception propagated. Incompatible with update operations and
* should only be used with withUsePartialUpdate(false)
* @param docVersionFn to extract the document version
* @return the {@link DocToBulk} with the function set
public DocToBulk withDocVersionFn(Write.FieldValueExtractFn docVersionFn) {
checkArgument(docVersionFn != null, "docVersionFn must not be null");
return builder().setDocVersionFn(docVersionFn).build();
* Provide a function to extract the target operation either upsert or delete from the document
* fields allowing dynamic bulk operation decision. While using withIsDeleteFn, it should be
* taken care that the document's id extraction is defined using the withIdFn function or else
* IllegalArgumentException is thrown. Should the function throw an Exception then the batch
* will fail and the exception propagated.
* @param isDeleteFn set to true for deleting the specific document
* @return the {@link Write} with the function set
public DocToBulk withIsDeleteFn(Write.BooleanFieldValueExtractFn isDeleteFn) {
checkArgument(isDeleteFn != null, "deleteFn is required");
return builder().setIsDeleteFn(isDeleteFn).build();
* Provide a function to extract the doc version from the document. This version number will be
* used as the document version in Elasticsearch. Should the function throw an Exception then
* the batch will fail and the exception propagated. Incompatible with update operations and
* should only be used with withUsePartialUpdate(false)
* @param docVersionType the version type to use, one of {@link VERSION_TYPES}
* @return the {@link DocToBulk} with the doc version type set
public DocToBulk withDocVersionType(String docVersionType) {
"docVersionType must be one of " + "%s",
String.join(", ", VERSION_TYPES));
return builder().setDocVersionType(docVersionType).build();
* Use to set explicitly which version of Elasticsearch the destination cluster is running.
* Providing this hint means there is no need for setting {@link
* DocToBulk#withConnectionConfiguration}. This can also be very useful for testing purposes.
* <p>Note: if the value of @param backendVersion differs from the version the destination
* cluster is running, behavior is undefined and likely to yield errors.
* @param backendVersion the major version number of the version of Elasticsearch being run in
* the cluster where documents will be indexed.
* @return the {@link DocToBulk} with the Elasticsearch major version number set
public DocToBulk withBackendVersion(int backendVersion) {
"Backend version may only be one of " + "%s",
String.join(", ", VERSION_TYPES));
return builder().setBackendVersion(backendVersion).build();
public PCollection<Document> expand(PCollection<String> docs) {
ConnectionConfiguration connectionConfiguration = getConnectionConfiguration();
Integer backendVersion = getBackendVersion();
Write.FieldValueExtractFn idFn = getIdFn();
Write.BooleanFieldValueExtractFn isDeleteFn = getIsDeleteFn();
(backendVersion != null || connectionConfiguration != null),
"withBackendVersion() or withConnectionConfiguration() is required");
isDeleteFn == null || idFn != null,
"Id needs to be specified by withIdFn for delete operation");
return docs.apply(ParDo.of(new DocToBulkFn(this)));
// Encapsulates the elements which form the metadata for an Elasticsearch bulk operation
private static class DocumentMetadata implements Serializable {
final String index;
final String type;
final String id;
final Integer retryOnConflict;
final String routing;
final Integer backendVersion;
final String version;
final String versionType;
String index,
String type,
String id,
Integer retryOnConflict,
String routing,
Integer backendVersion,
String version,
String versionType) {
this.index = index; = id;
this.type = type;
this.retryOnConflict = retryOnConflict;
this.routing = routing;
this.backendVersion = backendVersion;
this.version = version;
this.versionType = versionType;
private static class DocumentMetadataSerializer extends StdSerializer<DocumentMetadata> {
private DocumentMetadataSerializer() {
public void serialize(DocumentMetadata value, JsonGenerator gen, SerializerProvider provider)
throws IOException {
if (value.index != null) {
gen.writeStringField("_index", value.index);
if (value.type != null) {
gen.writeStringField("_type", value.type);
if ( != null) {
if (value.routing != null) {
gen.writeStringField("routing", value.routing);
if (value.retryOnConflict != null && value.backendVersion <= 6) {
gen.writeNumberField("_retry_on_conflict", value.retryOnConflict);
if (value.retryOnConflict != null && value.backendVersion >= 7) {
gen.writeNumberField("retry_on_conflict", value.retryOnConflict);
if (value.version != null) {
gen.writeStringField("version", value.version);
if (value.versionType != null) {
gen.writeStringField("version_type", value.versionType);
static String createBulkApiEntity(DocToBulk spec, String document, int backendVersion)
throws IOException {
String documentMetadata = "{}";
boolean isDelete = false;
if (spec.getIndexFn() != null
|| spec.getTypeFn() != null
|| spec.getIdFn() != null
|| spec.getRoutingFn() != null) {
// parse once and reused for efficiency
JsonNode parsedDocument = OBJECT_MAPPER.readTree(document);
documentMetadata = getDocumentMetadata(spec, parsedDocument, backendVersion);
if (spec.getIsDeleteFn() != null) {
isDelete = spec.getIsDeleteFn().apply(parsedDocument);
final boolean isAppendOnly = Boolean.TRUE.equals(spec.getAppendOnly());
if (isDelete) {
checkState(!isAppendOnly, "No deletions allowed for append-only indices");
// delete request used for deleting a document
return String.format("{ \"delete\" : %s }%n", documentMetadata);
if (isAppendOnly) {
return String.format("{ \"create\" : %s }%n%s%n", documentMetadata, document);
// index is an insert/upsert and update is a partial update (or insert if not
// existing)
if (Boolean.TRUE.equals(spec.getUsePartialUpdate())) {
return String.format(
"{ \"update\" : %s }%n{ \"doc\" : %s, " + "\"doc_as_upsert\" : true }%n",
documentMetadata, document);
} else if (spec.getUpsertScript() != null) {
return String.format(
"{ \"update\" : %s }%n{ \"script\" : {\"source\": \"%s\", "
+ "\"params\": %s}, \"upsert\" : %s, \"scripted_upsert\": true}%n",
documentMetadata, spec.getUpsertScript(), document, document);
} else {
return String.format("{ \"index\" : %s }%n%s%n", documentMetadata, document);
private static String lowerCaseOrNull(String input) {
return input == null ? null : input.toLowerCase();
* Extracts the components that comprise the document address from the document using the {@link
* Write.FieldValueExtractFn} configured. This allows any or all of the index, type and document
* id to be controlled on a per document basis. If none are provided then an empty default of
* {@code {}} is returned. Sanitization of the index is performed, automatically lower-casing
* the value as required by Elasticsearch.
* @param parsedDocument the json from which the index, type and id may be extracted
* @return the document address as JSON or the default
* @throws IOException if the document cannot be parsed as JSON
private static String getDocumentMetadata(
DocToBulk spec, JsonNode parsedDocument, int backendVersion) throws IOException {
DocumentMetadata metadata =
new DocumentMetadata(
spec.getIndexFn() != null
? lowerCaseOrNull(spec.getIndexFn().apply(parsedDocument))
: null,
spec.getTypeFn() != null ? spec.getTypeFn().apply(parsedDocument) : null,
spec.getIdFn() != null ? spec.getIdFn().apply(parsedDocument) : null,
|| (spec.getUpsertScript() != null && !spec.getUpsertScript().isEmpty()))
: null,
spec.getRoutingFn() != null ? spec.getRoutingFn().apply(parsedDocument) : null,
spec.getDocVersionFn() != null ? spec.getDocVersionFn().apply(parsedDocument) : null,
return OBJECT_MAPPER.writeValueAsString(metadata);
/** {@link DoFn} to for the {@link DocToBulk} transform. */
static class DocToBulkFn extends DoFn<String, Document> {
private final DocToBulk spec;
private int backendVersion;
public DocToBulkFn(DocToBulk spec) {
this.spec = spec;
public void setup() throws IOException {
if (spec.getBackendVersion() != null) {
backendVersion = spec.getBackendVersion();
} else {
backendVersion = ElasticsearchIO.getBackendVersion(spec.getConnectionConfiguration());
public void processElement(ProcessContext c) throws IOException {
String inputDoc = c.element();
String bulkDirective = createBulkApiEntity(spec, inputDoc, backendVersion);
// N.B. Saving the element timestamp for later use allows for exactly emulating
// c.output(...) because c.output is equivalent to
// c.outputWithTimestamp(..., c.timestamp())
public static class DocumentCoder extends AtomicCoder<Document> implements Serializable {
private static final DocumentCoder INSTANCE = new DocumentCoder();
private DocumentCoder() {}
public static DocumentCoder of() {
return INSTANCE;
public void encode(Document value, OutputStream outStream) throws IOException {
NullableCoder.of(StringUtf8Coder.of()).encode(value.getInputDoc(), outStream);
NullableCoder.of(StringUtf8Coder.of()).encode(value.getBulkDirective(), outStream);
BooleanCoder.of().encode(value.getHasError(), outStream);
NullableCoder.of(StringUtf8Coder.of()).encode(value.getResponseItemJson(), outStream);
NullableCoder.of(InstantCoder.of()).encode(value.getTimestamp(), outStream);
public Document decode(InputStream inStream) throws IOException {
String inputDoc = NullableCoder.of(StringUtf8Coder.of()).decode(inStream);
String bulkDirective = NullableCoder.of(StringUtf8Coder.of()).decode(inStream);
boolean hasError = BooleanCoder.of().decode(inStream);
String responseItemJson = NullableCoder.of(StringUtf8Coder.of()).decode(inStream);
Instant timestamp = NullableCoder.of(InstantCoder.of()).decode(inStream);
return Document.create()
// Immutable POJO for maintaining various states of documents and their bulk representation, plus
// response from ES for the given document and the timestamp of the data
public abstract static class Document implements Serializable {
public abstract @Nullable String getInputDoc();
public abstract @Nullable String getBulkDirective();
public abstract Boolean getHasError();
public abstract @Nullable String getResponseItemJson();
public abstract @Nullable Instant getTimestamp();
abstract Builder toBuilder();
abstract static class Builder {
abstract Builder setInputDoc(String inputDoc);
abstract Builder setBulkDirective(String bulkDirective);
abstract Builder setHasError(boolean hasError);
abstract Builder setResponseItemJson(String responseItemJson);
abstract Builder setTimestamp(Instant timestamp);
abstract Document build();
public static Document create() {
return new AutoValue_ElasticsearchIO_Document.Builder().setHasError(false).build();
* Sets the input document i.e. desired document that will end up in Elasticsearch for this
* WriteSummary object. The inputDoc will be a document that was part of the input PCollection
* to either {@link Write} or {@link DocToBulk}
* @param inputDoc Serialized json input document destined to end up in Elasticsearch.
* @return WriteSummary with inputDocument set.
public Document withInputDoc(String inputDoc) {
return toBuilder().setInputDoc(inputDoc).build();
* Sets the bulk directive representation of an input document. This will be new-line separated
* JSON where each line is valid JSON. Typically the first line includes meta-data and
* instructions to Elasticsearch such as whether to overwrite a document, delete it, etc. and
* the second line (if present) will be the document itself. For more info please see
* @param bulkDirective Serialized new-line delimited json bulk API information.
* @return WriteSummary with bulkDirective set.
public Document withBulkDirective(String bulkDirective) {
return toBuilder().setBulkDirective(bulkDirective).build();
* Sets the element from Elasticsearch Bulk API response "items" pertaining to this
* WriteSummary.
* @param responseItemJson The Elasticsearch Bulk API response.
* @return WriteSummary with Elasticsearch Bulk API response set.
public Document withResponseItemJson(String responseItemJson) {
return toBuilder().setResponseItemJson(responseItemJson).build();
* Used to set whether or not there was an error for a given document as indicated by the
* response from Elasticsearch. Note that if using {@link Write#withAllowableResponseErrors}
* errors which are allowed will have a false value for hasError for their respective
* WriteSummary.
* @param hasError Whether or not Elasticsearch returned an error when persisting a bulk
* directive.
* @return WriteSummary with hasError set.
public Document withHasError(boolean hasError) {
return toBuilder().setHasError(hasError).build();
* Sets the timestamp of the element in the PCollection, to be used in order to output
* WriteSummary to the same window from which the inputDoc originated.
* @param timestamp The timestamp with which the WriteSummary will be output.
* @return WriteSummary with timestamp set.
public Document withTimestamp(Instant timestamp) {
return toBuilder().setTimestamp(timestamp).build();
* A {@link PTransform} writing data to Elasticsearch.
* <p>This {@link PTransform} acts as a convenience wrapper for doing both document to bulk API
* serialization as well as batching those Bulk API entities and writing them to an Elasticsearch
* cluster. This class is effectively a thin proxy for DocToBulk->BulkIO all-in-one for
* convenience and backward compatibility.
public static class Write extends PTransform<PCollection<String>, PCollectionTuple> {
public interface FieldValueExtractFn extends SerializableFunction<JsonNode, String> {}
public interface BooleanFieldValueExtractFn extends SerializableFunction<JsonNode, Boolean> {}
// N.B. Be sure to create tuple tags as new anonymous subclasses to avoid type erasure issues
public static final TupleTag<Document> SUCCESSFUL_WRITES = new TupleTag<Document>() {};
public static final TupleTag<Document> FAILED_WRITES = new TupleTag<Document>() {};
private DocToBulk docToBulk = new AutoValue_ElasticsearchIO_DocToBulk.Builder().build();
private BulkIO bulkIO =
new AutoValue_ElasticsearchIO_BulkIO.Builder()
// advised default starting batch size in ES docs
// advised default starting batch size in ES docs
.setMaxBatchSizeBytes(5L * 1024L * 1024L)
public DocToBulk getDocToBulk() {
return docToBulk;
public BulkIO getBulkIO() {
return bulkIO;
// For building Doc2Bulk
/** Refer to {@link DocToBulk#withIdFn}. */
public Write withIdFn(FieldValueExtractFn idFn) {
docToBulk = docToBulk.withIdFn(idFn);
return this;
/** Refer to {@link DocToBulk#withIndexFn}. */
public Write withIndexFn(FieldValueExtractFn indexFn) {
docToBulk = docToBulk.withIndexFn(indexFn);
return this;
/** Refer to {@link DocToBulk#withRoutingFn}. */
public Write withRoutingFn(FieldValueExtractFn routingFn) {
docToBulk = docToBulk.withRoutingFn(routingFn);
return this;
/** Refer to {@link DocToBulk#withTypeFn}. */
public Write withTypeFn(FieldValueExtractFn typeFn) {
docToBulk = docToBulk.withTypeFn(typeFn);
return this;
/** Refer to {@link DocToBulk#withDocVersionFn}. */
public Write withDocVersionFn(FieldValueExtractFn docVersionFn) {
docToBulk = docToBulk.withDocVersionFn(docVersionFn);
return this;
/** Refer to {@link DocToBulk#withDocVersionType}. */
public Write withDocVersionType(String docVersionType) {
docToBulk = docToBulk.withDocVersionType(docVersionType);
return this;
/** Refer to {@link DocToBulk#withUsePartialUpdate}. */
public Write withUsePartialUpdate(boolean usePartialUpdate) {
docToBulk = docToBulk.withUsePartialUpdate(usePartialUpdate);
return this;
/** Refer to {@link DocToBulk#withAppendOnly}. */
public Write withAppendOnly(boolean appendOnly) {
docToBulk = docToBulk.withAppendOnly(appendOnly);
return this;
/** Refer to {@link DocToBulk#withUpsertScript}. */
public Write withUpsertScript(String source) {
docToBulk = docToBulk.withUpsertScript(source);
return this;
/** Refer to {@link DocToBulk#withBackendVersion}. */
public Write withBackendVersion(int backendVersion) {
docToBulk = docToBulk.withBackendVersion(backendVersion);
return this;
/** Refer to {@link DocToBulk#withIsDeleteFn}. */
public Write withIsDeleteFn(Write.BooleanFieldValueExtractFn isDeleteFn) {
docToBulk = docToBulk.withIsDeleteFn(isDeleteFn);
return this;
// End building Doc2Bulk
/** Refer to {@link BulkIO#withConnectionConfiguration}. */
public Write withConnectionConfiguration(ConnectionConfiguration connectionConfiguration) {
checkArgument(connectionConfiguration != null, "connectionConfiguration can not be null");
docToBulk = docToBulk.withConnectionConfiguration(connectionConfiguration);
bulkIO = bulkIO.withConnectionConfiguration(connectionConfiguration);
return this;
/** Refer to {@link BulkIO#withMaxBatchSize}. */
public Write withMaxBatchSize(long batchSize) {
bulkIO = bulkIO.withMaxBatchSize(batchSize);
return this;
/** Refer to {@link BulkIO#withMaxBatchSizeBytes}. */
public Write withMaxBatchSizeBytes(long batchSizeBytes) {
bulkIO = bulkIO.withMaxBatchSizeBytes(batchSizeBytes);
return this;
/** Refer to {@link BulkIO#withRetryConfiguration}. */
public Write withRetryConfiguration(RetryConfiguration retryConfiguration) {
bulkIO = bulkIO.withRetryConfiguration(retryConfiguration);
return this;
/** Refer to {@link BulkIO#withIgnoreVersionConflicts}. */
public Write withIgnoreVersionConflicts(boolean ignoreVersionConflicts) {
bulkIO = bulkIO.withIgnoreVersionConflicts(ignoreVersionConflicts);
return this;
/** Refer to {@link BulkIO#withUseStatefulBatches}. */
public Write withUseStatefulBatches(boolean useStatefulBatches) {
bulkIO = bulkIO.withUseStatefulBatches(useStatefulBatches);
return this;
/** Refer to {@link BulkIO#withMaxBufferingDuration}. */
public Write withMaxBufferingDuration(Duration maxBufferingDuration) {
bulkIO = bulkIO.withMaxBufferingDuration(maxBufferingDuration);
return this;
* Refer to {@link BulkIO#withMaxParallelRequestsPerWindow}.
* @deprecated use {@link Write#withMaxParallelRequests} instead
public Write withMaxParallelRequestsPerWindow(int maxParallelRequestsPerWindow) {
bulkIO = bulkIO.withMaxParallelRequestsPerWindow(maxParallelRequestsPerWindow);
return this;
/** Refer to {@link BulkIO#withMaxParallelRequests}. */
public Write withMaxParallelRequests(int maxParallelRequests) {
bulkIO = bulkIO.withMaxParallelRequests(maxParallelRequests);
return this;
/** Refer to {@link BulkIO#withAllowableResponseErrors}. */
public Write withAllowableResponseErrors(@Nullable Set<String> allowableResponseErrors) {
if (allowableResponseErrors == null) {
allowableResponseErrors = new HashSet<>();
bulkIO = bulkIO.withAllowableResponseErrors(allowableResponseErrors);
return this;
/** Refer to {@link BulkIO#withThrowWriteErrors}. */
public Write withThrowWriteErrors(boolean throwWriteErrors) {
bulkIO = bulkIO.withThrowWriteErrors(throwWriteErrors);
return this;
public PCollectionTuple expand(PCollection<String> input) {
return input.apply(docToBulk).apply(bulkIO);
* A {@link PTransform} writing Bulk API entities created by {@link ElasticsearchIO.DocToBulk} to
* an Elasticsearch cluster. Typically, using {@link ElasticsearchIO.Write} is preferred, whereas
* using {@link ElasticsearchIO.DocToBulk} and BulkIO separately is for advanced use cases such as
* mirroring data to multiple clusters or data lakes without recomputation.
public abstract static class BulkIO extends PTransform<PCollection<Document>, PCollectionTuple> {
static final String RETRY_ATTEMPT_LOG = "Error writing to Elasticsearch. Retry attempt[{}]";
static final String RETRY_FAILED_LOG =
"Error writing to ES after %d attempt(s). No more attempts allowed";
abstract @Nullable ConnectionConfiguration getConnectionConfiguration();
abstract long getMaxBatchSize();
abstract long getMaxBatchSizeBytes();
abstract @Nullable Duration getMaxBufferingDuration();
abstract boolean getUseStatefulBatches();
abstract @Nullable Integer getMaxParallelRequestsPerWindow();
abstract int getMaxParallelRequests();
abstract @Nullable RetryConfiguration getRetryConfiguration();
abstract @Nullable Set<String> getAllowedResponseErrors();
abstract boolean getThrowWriteErrors();
abstract Builder builder();
abstract static class Builder {
abstract Builder setConnectionConfiguration(ConnectionConfiguration connectionConfiguration);
abstract Builder setMaxBatchSize(long maxBatchSize);
abstract Builder setMaxBatchSizeBytes(long maxBatchSizeBytes);
abstract Builder setRetryConfiguration(RetryConfiguration retryConfiguration);
abstract Builder setAllowedResponseErrors(Set<String> allowedResponseErrors);
abstract Builder setMaxBufferingDuration(Duration maxBufferingDuration);
abstract Builder setUseStatefulBatches(boolean useStatefulBatches);
/** @deprecated Use {@link #setMaxParallelRequests} instead. */
abstract Builder setMaxParallelRequestsPerWindow(int maxParallelRequestsPerWindow);
abstract Builder setMaxParallelRequests(int maxParallelRequests);
abstract Builder setThrowWriteErrors(boolean throwWriteErrors);
abstract BulkIO build();
* Provide the Elasticsearch connection configuration object.
* @param connectionConfiguration the Elasticsearch {@link ConnectionConfiguration} object
* @return the {@link BulkIO} with connection configuration set
public BulkIO withConnectionConfiguration(ConnectionConfiguration connectionConfiguration) {
checkArgument(connectionConfiguration != null, "connectionConfiguration can not be null");
return builder().setConnectionConfiguration(connectionConfiguration).build();
* Provide a maximum size in number of documents for the batch see bulk API
* ( Default is
* 1000 docs (like Elasticsearch bulk size advice). See
* Depending on the
* execution engine, size of bundles may vary, this sets the maximum size. Change this if you
* need to have smaller ElasticSearch bulks.
* @param batchSize maximum batch size in number of documents
* @return the {@link BulkIO} with connection batch size set
public BulkIO withMaxBatchSize(long batchSize) {
checkArgument(batchSize > 0, "batchSize must be > 0, but was %s", batchSize);
return builder().setMaxBatchSize(batchSize).build();
* Provide a maximum size in bytes for the batch see bulk API
* ( Default is 5MB
* (like Elasticsearch bulk size advice). See
* Depending on the
* execution engine, size of bundles may vary, this sets the maximum size. Change this if you
* need to have smaller ElasticSearch bulks.
* @param batchSizeBytes maximum batch size in bytes
* @return the {@link BulkIO} with connection batch size in bytes set
public BulkIO withMaxBatchSizeBytes(long batchSizeBytes) {
checkArgument(batchSizeBytes > 0, "batchSizeBytes must be > 0, but was %s", batchSizeBytes);
return builder().setMaxBatchSizeBytes(batchSizeBytes).build();
* Provides configuration to retry a failed batch call to Elasticsearch. A batch is considered
* as failed if the underlying {@link RestClient} surfaces 429 HTTP status code as error for one
* or more of the items in the {@link Response}. Users should consider that retrying might
* compound the underlying problem which caused the initial failure. Users should also be aware
* that once retrying is exhausted the error is surfaced to the runner which <em>may</em> then
* opt to retry the current bundle in entirety or abort if the max number of retries of the
* runner is completed. Retrying uses an exponential backoff algorithm, with minimum backoff of
* 5 seconds and then surfacing the error once the maximum number of retries or maximum
* configuration duration is exceeded.
* <p>Example use:
* <pre>{@code
* ElasticsearchIO.write()
* .withRetryConfiguration(ElasticsearchIO.RetryConfiguration.create(10, Duration.standardMinutes(3))
* ...
* }</pre>
* @param retryConfiguration the rules which govern the retry behavior
* @return the {@link BulkIO} with retrying configured
public BulkIO withRetryConfiguration(RetryConfiguration retryConfiguration) {
checkArgument(retryConfiguration != null, "retryConfiguration is required");
return builder().setRetryConfiguration(retryConfiguration).build();
* Whether or not to suppress version conflict errors in a Bulk API response. This can be useful
* if your use case involves using external version types.
* @param ignoreVersionConflicts true to suppress version conflicts, false to surface version
* conflict errors.
* @return the {@link BulkIO} with version conflict handling configured
public BulkIO withIgnoreVersionConflicts(boolean ignoreVersionConflicts) {
Set<String> allowedResponseErrors = getAllowedResponseErrors();
if (allowedResponseErrors == null) {
allowedResponseErrors = new HashSet<>();
if (ignoreVersionConflicts) {
return builder().setAllowedResponseErrors(allowedResponseErrors).build();
* Provide a set of textual error types which can be contained in Bulk API response
* items[].error.type field. Any element in @param allowableResponseErrorTypes will suppress
* errors of the same type in Bulk responses.
* <p>See also
* @param allowableResponseErrorTypes
* @return the {@link BulkIO} with allowable response errors set
public BulkIO withAllowableResponseErrors(@Nullable Set<String> allowableResponseErrorTypes) {
if (allowableResponseErrorTypes == null) {
allowableResponseErrorTypes = new HashSet<>();
return builder().setAllowedResponseErrors(allowableResponseErrorTypes).build();
* If using {@link BulkIO#withUseStatefulBatches}, this can be used to set a maximum elapsed
* time before buffered elements are emitted to Elasticsearch as a Bulk API request. If this
* config is not set, Bulk requests will not be issued until {@link BulkIO#getMaxBatchSize}
* number of documents have been buffered. This may result in higher latency in particular if
* your max batch size is set to a large value and your pipeline input is low volume.
* @param maxBufferingDuration the maximum duration to wait before sending any buffered
* documents to Elasticsearch, regardless of maxBatchSize.
* @return the {@link BulkIO} with maximum buffering duration set
public BulkIO withMaxBufferingDuration(Duration maxBufferingDuration) {
"Use of withMaxBufferingDuration requires withUseStatefulBatches(true). "
+ "Setting that automatically.");
return builder()
* Whether or not to use Stateful Processing to ensure bulk requests have the desired number of
* entities i.e. as close to the maxBatchSize as possible. By default without this feature
* enabled, Bulk requests will not contain more than maxBatchSize entities, but the lower bound
* of batch size is determined by Beam Runner bundle sizes, which may be as few as 1.
* @param useStatefulBatches true enables the use of Stateful Processing to ensure that batches
* are as close to the maxBatchSize as possible.
* @return the {@link BulkIO} with Stateful Processing enabled or disabled
public BulkIO withUseStatefulBatches(boolean useStatefulBatches) {
return builder().setUseStatefulBatches(useStatefulBatches).build();
* When using {@link BulkIO#withUseStatefulBatches} Stateful Processing, states and therefore
* batches are maintained per-key-per-window. BE AWARE that low values for @param
* maxParallelRequests, in particular if the input data has a finite number of windows, can
* reduce parallelism greatly. Because data will be temporarily globally windowed as part of
* writing data to Elasticsearch, if @param maxParallelRequests is set to 1, there will only
* ever be 1 request in flight. Having only a single request in flight can be beneficial for
* ensuring an Elasticsearch cluster is not overwhelmed by parallel requests, but may not work
* for all use cases. If this number is less than the number of maximum workers in your
* pipeline, the IO work will result in a sub-optimal distribution of the write step with most
* runners.
* @param maxParallelRequests the maximum number of parallel bulk requests for a window of data
* @return the {@link BulkIO} with maximum parallel bulk requests per window set
* @deprecated use {@link BulkIO#withMaxParallelRequests} instead.
public BulkIO withMaxParallelRequestsPerWindow(int maxParallelRequests) {
maxParallelRequests > 0, "maxParallelRequestsPerWindow value must be a positive integer");
return builder().setMaxParallelRequests(maxParallelRequests).build();
* When using {@link BulkIO#withUseStatefulBatches} Stateful Processing, states and therefore
* batches are maintained per-key-per-window. BE AWARE that low values for @param
* maxParallelRequests, in particular if the input data has a finite number of windows, can
* reduce parallelism greatly. Because data will be temporarily globally windowed as part of
* writing data to Elasticsearch, if @param maxParallelRequests is set to 1, there will only
* ever be 1 request in flight. Having only a single request in flight can be beneficial for
* ensuring an Elasticsearch cluster is not overwhelmed by parallel requests, but may not work
* for all use cases. If this number is less than the number of maximum workers in your
* pipeline, the IO work will result in a sub-optimal distribution of the write step with most
* runners.
* @param maxParallelRequests the maximum number of parallel bulk requests
* @return the {@link BulkIO} with maximum parallel bulk requests
public BulkIO withMaxParallelRequests(int maxParallelRequests) {
maxParallelRequests > 0, "maxParallelRequests value must be a positive integer");
return builder().setMaxParallelRequests(maxParallelRequests).build();
* Whether to throw runtime exceptions when write (IO) errors occur. Especially useful in
* streaming pipelines where non-transient IO failures will cause infinite retries. If true, a
* runtime error will be thrown for any error found by {@link ElasticsearchIO#createWriteReport}
* and/or (which is what org.elasticsearch.client.ResponseException based
* on) found by in batch flush. If false, a {@link PCollectionTuple} will be returned with tags
* {@link Write#SUCCESSFUL_WRITES} and {@link Write#FAILED_WRITES}, each being a {@link
* PCollection} of {@link Document} representing documents which were written to Elasticsearch
* without errors and those which failed to write due to errors, respectively.
* @param throwWriteErrors whether to surface write errors as runtime exceptions or return them
* in a {@link PCollection}
* @return the {@link BulkIO} with write error treatment configured
public BulkIO withThrowWriteErrors(boolean throwWriteErrors) {
return builder().setThrowWriteErrors(throwWriteErrors).build();
* Creates batches of documents using Stateful Processing based on user configurable settings of
* withMaxBufferingDuration and withMaxParallelRequestsPerWindow.
* <p>Mostly exists for testability of withMaxParallelRequestsPerWindow.
static class StatefulBatching
extends PTransform<PCollection<Document>, PCollection<KV<Integer, Iterable<Document>>>> {
final BulkIO spec;
private StatefulBatching(BulkIO bulkSpec) {
spec = bulkSpec;
public static StatefulBatching fromSpec(BulkIO spec) {
return new StatefulBatching(spec);
public PCollection<KV<Integer, Iterable<Document>>> expand(PCollection<Document> input) {
GroupIntoBatches<Integer, Document> groupIntoBatches =
if (spec.getMaxBufferingDuration() != null) {
groupIntoBatches =
return input
.apply(ParDo.of(new Reshuffle.AssignShardFn<>(spec.getMaxParallelRequests())))
public PCollectionTuple expand(PCollection<Document> input) {
ConnectionConfiguration connectionConfiguration = getConnectionConfiguration();
checkState(connectionConfiguration != null, "withConnectionConfiguration() is required");
WindowFn<Document, ?> originalWindowFn =
(WindowFn<Document, ?>) input.getWindowingStrategy().getWindowFn();
PCollection<Document> docResults;
PCollection<Document> globalDocs =
input.apply("Window inputs globally", Window.into(new GlobalWindows()));
if (getUseStatefulBatches()) {
docResults =
.apply(ParDo.of(new BulkIOStatefulFn(this)));
} else {
docResults = globalDocs.apply(ParDo.of(new BulkIOBundleFn(this)));
return docResults
// Restore windowing of input
.apply("Restore original windows", Window.into(originalWindowFn))
ParDo.of(new ResultFilteringFn())
.withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));
private static class ResultFilteringFn extends DoFn<Document, Document> {
public void processElement(@Element Document doc, MultiOutputReceiver out) {
if (doc.getHasError()) {
} else {
static class BulkIOBundleFn extends BulkIOBaseFn<Document> {
BulkIOBundleFn(BulkIO bulkSpec) {
public void processElement(ProcessContext context) throws Exception {
// the element KV pair is a pair of raw_doc + resulting Bulk API formatted newline-json
// based on DocToBulk settings
addAndMaybeFlush(context.element(), context);
Intended for use in conjunction with {@link GroupIntoBatches}
static class BulkIOStatefulFn extends BulkIOBaseFn<KV<Integer, Iterable<Document>>> {
BulkIOStatefulFn(BulkIO bulkSpec) {
public void processElement(ProcessContext context) throws Exception {
for (Document timedDoc : context.element().getValue()) {
addAndMaybeFlush(timedDoc, context);
/** {@link DoFn} to for the {@link BulkIO} transform. */
private abstract static class BulkIOBaseFn<T> extends DoFn<T, Document> {
private static final Duration RETRY_INITIAL_BACKOFF = Duration.standardSeconds(5);
private transient FluentBackoff retryBackoff;
private BulkIO spec;
private transient RestClient restClient;
private transient List<Document> batch;
long currentBatchSizeBytes;
protected BulkIOBaseFn(BulkIO bulkSpec) {
this.spec = bulkSpec;
public Duration getAllowedTimestampSkew() {
return Duration.millis(Long.MAX_VALUE);
public void setup() throws IOException {
ConnectionConfiguration connectionConfiguration = spec.getConnectionConfiguration();
restClient = connectionConfiguration.createClient();
retryBackoff =
if (spec.getRetryConfiguration() != null) {
retryBackoff =
.withMaxRetries(spec.getRetryConfiguration().getMaxAttempts() - 1)
public void startBundle(StartBundleContext context) {
batch = new ArrayList<>();
currentBatchSizeBytes = 0;
public void finishBundle(FinishBundleContext context)
throws IOException, InterruptedException {
flushAndOutputResults(new FinishBundleContextAdapter<>(context));
* Adapter interface which provides a common parent for {@link ProcessContext} and {@link
* FinishBundleContext} so that we are able to use a single common invocation to output from.
interface ContextAdapter {
void output(Document timedDoc);
private static final class ProcessContextAdapter<T> implements ContextAdapter {
private final DoFn<T, Document>.ProcessContext context;
private ProcessContextAdapter(DoFn<T, Document>.ProcessContext context) {
this.context = context;
public void output(Document timedDoc) {
context.outputWithTimestamp(timedDoc, timedDoc.getTimestamp());
private static final class FinishBundleContextAdapter<T> implements ContextAdapter {
private final DoFn<T, Document>.FinishBundleContext context;
private FinishBundleContextAdapter(DoFn<T, Document>.FinishBundleContext context) {
this.context = context;
public void output(Document timedDoc) {
context.output(timedDoc, timedDoc.getTimestamp(), GlobalWindow.INSTANCE);
private void flushAndOutputResults(ContextAdapter context)
throws IOException, InterruptedException {
for (Document timedDoc : flushBatch()) {
protected void addAndMaybeFlush(Document doc, ProcessContext context)
throws IOException, InterruptedException {
currentBatchSizeBytes += doc.getBulkDirective().getBytes(StandardCharsets.UTF_8).length;
if (batch.size() >= spec.getMaxBatchSize()
|| currentBatchSizeBytes >= spec.getMaxBatchSizeBytes()) {
flushAndOutputResults(new ProcessContextAdapter<>(context));
private boolean isRetryableClientException(Throwable t) {
// RestClient#performRequest only throws wrapped IOException so we must inspect the
// exception cause to determine if the exception is likely transient i.e. retryable or
// not.
// Retry for 500-range response code except for 501.
if (t.getCause() instanceof ResponseException) {
ResponseException ex = (ResponseException) t.getCause();
int statusCode = ex.getResponse().getStatusLine().getStatusCode();
return statusCode >= 500 && statusCode != 501;
return t.getCause() instanceof ConnectTimeoutException
|| t.getCause() instanceof SocketTimeoutException
|| t.getCause() instanceof ConnectionClosedException
|| t.getCause() instanceof ConnectException;
private List<Document> flushBatch() throws IOException, InterruptedException {
if (batch.isEmpty()) {
return new ArrayList<>();
"ElasticsearchIO batch size: {}, batch size bytes: {}",
StringBuilder bulkRequest = new StringBuilder();
// Create a stable list of input entries, because order is important to keep constant
List<Document> inputEntries = new ArrayList<>(batch);
currentBatchSizeBytes = 0L;
for (Document doc : inputEntries) {
// N.B. we need to ensure that we can iterate in the same order later to match up
// responses to these bulk directives. ES Bulk response `items` is in the same order
// as the bulk directives in the request, so order is imperative.
Response response = null;
HttpEntity responseEntity = null;
// Elasticsearch will default to the index/type provided the {@link
// ConnectionConfiguration} if none are set in the document meta (i.e.
// using ElasticsearchIO$DocToBulk#withIndexFn and
// ElasticsearchIO$DocToBulk#withTypeFn options)
String endPoint = spec.getConnectionConfiguration().getBulkEndPoint();
HttpEntity requestBody =
new NStringEntity(bulkRequest.toString(), ContentType.APPLICATION_JSON);
String elasticResponseExceptionMessage = null;
try {
Request request = new Request("POST", endPoint);
response = restClient.performRequest(request);
responseEntity = new BufferedHttpEntity(response.getEntity());
} catch ( ex) {
if (spec.getRetryConfiguration() == null || !isRetryableClientException(ex)) {
if (spec.getThrowWriteErrors()) {
throw ex;
} else {
elasticResponseExceptionMessage = ex.getMessage();
} else {
LOG.error("Caught ES timeout, retrying", ex);
if (spec.getRetryConfiguration() != null
&& elasticResponseExceptionMessage == null
&& (response == null
|| responseEntity == null
|| spec.getRetryConfiguration().getRetryPredicate().test(responseEntity))) {
if (responseEntity != null
&& spec.getRetryConfiguration().getRetryPredicate().test(responseEntity)) {
LOG.warn("ES Cluster is responding with HTP 429 - TOO_MANY_REQUESTS.");
responseEntity = handleRetry("POST", endPoint, Collections.emptyMap(), requestBody);
List<Document> responses;
// If was thrown, return all input Documents with
// withHasError(true)
// so that they could be caught by FAILED_WRITES tag.
if (elasticResponseExceptionMessage != null) {
String errorJsonMessage =
"{\"message\":\" was thrown in batch flush: %s\"}",
responses =
.map(doc -> doc.withHasError(true).withResponseItemJson(errorJsonMessage))
} else {
responses =
responseEntity, spec.getAllowedResponseErrors(), spec.getThrowWriteErrors());
(inputTimedDoc, responseDoc) ->
/** retry request based on retry configuration policy. */
private HttpEntity handleRetry(
String method, String endpoint, Map<String, String> params, HttpEntity requestBody)
throws IOException, InterruptedException {
Response response;
HttpEntity responseEntity = null;
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backoff = retryBackoff.backoff();
int attempt = 0;
// while retry policy exists
while (, backoff)) {
LOG.warn(RETRY_ATTEMPT_LOG, ++attempt);
try {
Request request = new Request(method, endpoint);
response = restClient.performRequest(request);
responseEntity = new BufferedHttpEntity(response.getEntity());
} catch ( ex) {
if (isRetryableClientException(ex)) {
LOG.error("Caught ES timeout, retrying", ex);
// if response has no 429 errors
if (!Objects.requireNonNull(spec.getRetryConfiguration())
.test(responseEntity)) {
return responseEntity;
} else {
LOG.warn("ES Cluster is responding with HTP 429 - TOO_MANY_REQUESTS.");
throw new IOException(String.format(RETRY_FAILED_LOG, attempt));
public void closeClient() throws IOException {
if (restClient != null) {
private static void maybeLogVersionDeprecationWarning(int clusterVersion) {
if (DEPRECATED_CLUSTER_VERSIONS.contains(clusterVersion)) {
"Support for Elasticsearch cluster version {} will be dropped in a future release of "
+ "the Apache Beam SDK",
static int getBackendVersion(RestClient restClient) {
try {
Request request = new Request("GET", "");
Response response = restClient.performRequest(request);
JsonNode jsonNode = parseResponse(response.getEntity());
int backendVersion =
Integer.parseInt(jsonNode.path("version").path("number").asText().substring(0, 1));
"The Elasticsearch version to connect to is %s.x. "
+ "This version of the ElasticsearchIO is only compatible with "
+ "Elasticsearch "
return backendVersion;
} catch (IOException e) {
throw new IllegalArgumentException("Cannot get Elasticsearch version", e);
static int getBackendVersion(ConnectionConfiguration connectionConfiguration) {
try (RestClient restClient = connectionConfiguration.createClient()) {
return getBackendVersion(restClient);
} catch (IOException e) {
throw new IllegalArgumentException("Cannot get Elasticsearch version", e);