blob: 2ce0b60f95554fa4744151f7b641600a005e5df7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.elasticsearch7;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
import org.apache.flink.util.Preconditions;
import org.apache.http.HttpHost;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.client.RestHighLevelClient;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
* Elasticsearch 7.x sink that requests multiple {@link ActionRequest ActionRequests} against a
* cluster for each incoming element.
*
* <p>The sink internally uses a {@link RestHighLevelClient} to communicate with an Elasticsearch
* cluster. The sink will fail if no cluster can be connected to using the provided transport
* addresses passed to the constructor.
*
* <p>Internally, the sink will use a {@link BulkProcessor} to send {@link ActionRequest
* ActionRequests}. This will buffer elements before sending a request to the cluster. The behaviour
* of the {@code BulkProcessor} can be configured using these config keys:
*
* <ul>
* <li>{@code bulk.flush.max.actions}: Maximum amount of elements to buffer
* <li>{@code bulk.flush.max.size.mb}: Maximum amount of data (in megabytes) to buffer
* <li>{@code bulk.flush.interval.ms}: Interval at which to flush data regardless of the other two
* settings in milliseconds
* </ul>
*
* <p>You also have to provide an {@link ElasticsearchSinkFunction}. This is used to create multiple
* {@link ActionRequest ActionRequests} for each incoming element. See the class level documentation
* of {@link ElasticsearchSinkFunction} for an example.
*
* @param <T> Type of the elements handled by this sink
*/
@PublicEvolving
public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevelClient> {
private static final long serialVersionUID = 1L;
private ElasticsearchSink(
Map<String, String> bulkRequestsConfig,
List<HttpHost> httpHosts,
ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
ActionRequestFailureHandler failureHandler,
RestClientFactory restClientFactory) {
super(
new Elasticsearch7ApiCallBridge(httpHosts, restClientFactory),
bulkRequestsConfig,
elasticsearchSinkFunction,
failureHandler);
}
/**
* A builder for creating an {@link ElasticsearchSink}.
*
* @param <T> Type of the elements handled by the sink this builder creates.
*/
@PublicEvolving
public static class Builder<T> {
private final List<HttpHost> httpHosts;
private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction;
private Map<String, String> bulkRequestsConfig = new HashMap<>();
private ActionRequestFailureHandler failureHandler = new NoOpFailureHandler();
private RestClientFactory restClientFactory = restClientBuilder -> {};
/**
* Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link
* RestHighLevelClient}.
*
* @param httpHosts The list of {@link HttpHost} to which the {@link RestHighLevelClient}
* connects to.
* @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest}
* from the incoming element.
*/
public Builder(
List<HttpHost> httpHosts, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
this.httpHosts = Preconditions.checkNotNull(httpHosts);
this.elasticsearchSinkFunction = Preconditions.checkNotNull(elasticsearchSinkFunction);
}
/**
* Sets the maximum number of actions to buffer for each bulk request. You can pass -1 to
* disable it.
*
* @param numMaxActions the maxinum number of actions to buffer per bulk request.
*/
public void setBulkFlushMaxActions(int numMaxActions) {
Preconditions.checkArgument(
numMaxActions == -1 || numMaxActions > 0,
"Max number of buffered actions must be larger than 0.");
this.bulkRequestsConfig.put(
CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, String.valueOf(numMaxActions));
}
/**
* Sets the maximum size of buffered actions, in mb, per bulk request. You can pass -1 to
* disable it.
*
* @param maxSizeMb the maximum size of buffered actions, in mb.
*/
public void setBulkFlushMaxSizeMb(int maxSizeMb) {
Preconditions.checkArgument(
maxSizeMb == -1 || maxSizeMb > 0,
"Max size of buffered actions must be larger than 0.");
this.bulkRequestsConfig.put(
CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB, String.valueOf(maxSizeMb));
}
/**
* Sets the bulk flush interval, in milliseconds. You can pass -1 to disable it.
*
* @param intervalMillis the bulk flush interval, in milliseconds.
*/
public void setBulkFlushInterval(long intervalMillis) {
Preconditions.checkArgument(
intervalMillis == -1 || intervalMillis >= 0,
"Interval (in milliseconds) between each flush must be larger than or equal to 0.");
this.bulkRequestsConfig.put(
CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, String.valueOf(intervalMillis));
}
/**
* Sets whether or not to enable bulk flush backoff behaviour.
*
* @param enabled whether or not to enable backoffs.
*/
public void setBulkFlushBackoff(boolean enabled) {
this.bulkRequestsConfig.put(
CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE, String.valueOf(enabled));
}
/**
* Sets the type of back of to use when flushing bulk requests.
*
* @param flushBackoffType the backoff type to use.
*/
public void setBulkFlushBackoffType(FlushBackoffType flushBackoffType) {
this.bulkRequestsConfig.put(
CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE,
Preconditions.checkNotNull(flushBackoffType).toString());
}
/**
* Sets the maximum number of retries for a backoff attempt when flushing bulk requests.
*
* @param maxRetries the maximum number of retries for a backoff attempt when flushing bulk
* requests
*/
public void setBulkFlushBackoffRetries(int maxRetries) {
Preconditions.checkArgument(
maxRetries > 0, "Max number of backoff attempts must be larger than 0.");
this.bulkRequestsConfig.put(
CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES, String.valueOf(maxRetries));
}
/**
* Sets the amount of delay between each backoff attempt when flushing bulk requests, in
* milliseconds.
*
* @param delayMillis the amount of delay between each backoff attempt when flushing bulk
* requests, in milliseconds.
*/
public void setBulkFlushBackoffDelay(long delayMillis) {
Preconditions.checkArgument(
delayMillis >= 0,
"Delay (in milliseconds) between each backoff attempt must be larger than or equal to 0.");
this.bulkRequestsConfig.put(
CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY, String.valueOf(delayMillis));
}
/**
* Sets a failure handler for action requests.
*
* @param failureHandler This is used to handle failed {@link ActionRequest}.
*/
public void setFailureHandler(ActionRequestFailureHandler failureHandler) {
this.failureHandler = Preconditions.checkNotNull(failureHandler);
}
/**
* Sets a REST client factory for custom client configuration.
*
* @param restClientFactory the factory that configures the rest client.
*/
public void setRestClientFactory(RestClientFactory restClientFactory) {
this.restClientFactory = Preconditions.checkNotNull(restClientFactory);
}
/**
* Creates the Elasticsearch sink.
*
* @return the created Elasticsearch sink.
*/
public ElasticsearchSink<T> build() {
return new ElasticsearchSink<>(
bulkRequestsConfig,
httpHosts,
elasticsearchSinkFunction,
failureHandler,
restClientFactory);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Builder<?> builder = (Builder<?>) o;
return Objects.equals(httpHosts, builder.httpHosts)
&& Objects.equals(elasticsearchSinkFunction, builder.elasticsearchSinkFunction)
&& Objects.equals(bulkRequestsConfig, builder.bulkRequestsConfig)
&& Objects.equals(failureHandler, builder.failureHandler)
&& Objects.equals(restClientFactory, builder.restClientFactory);
}
@Override
public int hashCode() {
return Objects.hash(
httpHosts,
elasticsearchSinkFunction,
bulkRequestsConfig,
failureHandler,
restClientFactory);
}
}
}