blob: 4149d14c23ab3731643e90db2cd8c5e175ca6759 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.elasticsearch6;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
import org.apache.http.HttpHost;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_BACKOFF_DELAY;
import static org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_BACKOFF_ENABLED;
import static org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_BACKOFF_RETRIES;
import static org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_BACKOFF_TYPE;
import static org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_INTERVAL;
import static org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_MAX_ACTIONS;
import static org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_MAX_SIZE;
import static org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption.DISABLE_FLUSH_ON_CHECKPOINT;
import static org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption.REST_MAX_RETRY_TIMEOUT;
import static org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption.REST_PATH_PREFIX;
/**
* Version-specific upsert table sink for Elasticsearch 6.
*/
@Internal
public class Elasticsearch6UpsertTableSink extends ElasticsearchUpsertTableSinkBase {
@VisibleForTesting
static final RequestFactory UPDATE_REQUEST_FACTORY =
new Elasticsearch6RequestFactory();
public Elasticsearch6UpsertTableSink(
boolean isAppendOnly,
TableSchema schema,
List<Host> hosts,
String index,
String docType,
String keyDelimiter,
String keyNullLiteral,
SerializationSchema<Row> serializationSchema,
XContentType contentType,
ActionRequestFailureHandler failureHandler,
Map<SinkOption, String> sinkOptions) {
super(
isAppendOnly,
schema,
hosts,
index,
docType,
keyDelimiter,
keyNullLiteral,
serializationSchema,
contentType,
failureHandler,
sinkOptions,
UPDATE_REQUEST_FACTORY);
}
@Override
protected ElasticsearchUpsertTableSinkBase copy(
boolean isAppendOnly,
TableSchema schema,
List<Host> hosts,
String index,
String docType,
String keyDelimiter,
String keyNullLiteral,
SerializationSchema<Row> serializationSchema,
XContentType contentType,
ActionRequestFailureHandler failureHandler,
Map<SinkOption, String> sinkOptions,
RequestFactory requestFactory) {
return new Elasticsearch6UpsertTableSink(
isAppendOnly,
schema,
hosts,
index,
docType,
keyDelimiter,
keyNullLiteral,
serializationSchema,
contentType,
failureHandler,
sinkOptions);
}
@Override
protected SinkFunction<Tuple2<Boolean, Row>> createSinkFunction(
List<Host> hosts,
ActionRequestFailureHandler failureHandler,
Map<SinkOption, String> sinkOptions,
ElasticsearchUpsertSinkFunction upsertSinkFunction) {
final List<HttpHost> httpHosts = hosts.stream()
.map((host) -> new HttpHost(host.hostname, host.port, host.protocol))
.collect(Collectors.toList());
final ElasticsearchSink.Builder<Tuple2<Boolean, Row>> builder = createBuilder(upsertSinkFunction, httpHosts);
builder.setFailureHandler(failureHandler);
Optional.ofNullable(sinkOptions.get(BULK_FLUSH_MAX_ACTIONS))
.ifPresent(v -> builder.setBulkFlushMaxActions(Integer.valueOf(v)));
Optional.ofNullable(sinkOptions.get(BULK_FLUSH_MAX_SIZE))
.ifPresent(v -> builder.setBulkFlushMaxSizeMb(MemorySize.parse(v).getMebiBytes()));
Optional.ofNullable(sinkOptions.get(BULK_FLUSH_INTERVAL))
.ifPresent(v -> builder.setBulkFlushInterval(Long.valueOf(v)));
Optional.ofNullable(sinkOptions.get(BULK_FLUSH_BACKOFF_ENABLED))
.ifPresent(v -> builder.setBulkFlushBackoff(Boolean.valueOf(v)));
Optional.ofNullable(sinkOptions.get(BULK_FLUSH_BACKOFF_TYPE))
.ifPresent(v -> builder.setBulkFlushBackoffType(ElasticsearchSinkBase.FlushBackoffType.valueOf(v)));
Optional.ofNullable(sinkOptions.get(BULK_FLUSH_BACKOFF_RETRIES))
.ifPresent(v -> builder.setBulkFlushBackoffRetries(Integer.valueOf(v)));
Optional.ofNullable(sinkOptions.get(BULK_FLUSH_BACKOFF_DELAY))
.ifPresent(v -> builder.setBulkFlushBackoffDelay(Long.valueOf(v)));
builder.setRestClientFactory(
new DefaultRestClientFactory(
Optional.ofNullable(sinkOptions.get(REST_MAX_RETRY_TIMEOUT))
.map(Integer::valueOf)
.orElse(null),
sinkOptions.get(REST_PATH_PREFIX)));
final ElasticsearchSink<Tuple2<Boolean, Row>> sink = builder.build();
Optional.ofNullable(sinkOptions.get(DISABLE_FLUSH_ON_CHECKPOINT))
.ifPresent(v -> {
if (Boolean.valueOf(v)) {
sink.disableFlushOnCheckpoint();
}
});
return sink;
}
@VisibleForTesting
ElasticsearchSink.Builder<Tuple2<Boolean, Row>> createBuilder(
ElasticsearchUpsertSinkFunction upsertSinkFunction,
List<HttpHost> httpHosts) {
return new ElasticsearchSink.Builder<>(httpHosts, upsertSinkFunction);
}
// --------------------------------------------------------------------------------------------
// Helper classes
// --------------------------------------------------------------------------------------------
/**
* Serializable {@link RestClientFactory} used by the sink.
*/
@VisibleForTesting
static class DefaultRestClientFactory implements RestClientFactory {
private Integer maxRetryTimeout;
private String pathPrefix;
public DefaultRestClientFactory(@Nullable Integer maxRetryTimeout, @Nullable String pathPrefix) {
this.maxRetryTimeout = maxRetryTimeout;
this.pathPrefix = pathPrefix;
}
@Override
public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
if (maxRetryTimeout != null) {
restClientBuilder.setMaxRetryTimeoutMillis(maxRetryTimeout);
}
if (pathPrefix != null) {
restClientBuilder.setPathPrefix(pathPrefix);
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DefaultRestClientFactory that = (DefaultRestClientFactory) o;
return Objects.equals(maxRetryTimeout, that.maxRetryTimeout) &&
Objects.equals(pathPrefix, that.pathPrefix);
}
@Override
public int hashCode() {
return Objects.hash(
maxRetryTimeout,
pathPrefix);
}
}
/**
* Version-specific creation of {@link org.elasticsearch.action.ActionRequest}s used by the sink.
*/
private static class Elasticsearch6RequestFactory implements RequestFactory {
@Override
public UpdateRequest createUpdateRequest(
String index,
String docType,
String key,
XContentType contentType,
byte[] document) {
return new UpdateRequest(index, docType, key)
.doc(document, contentType)
.upsert(document, contentType);
}
@Override
public IndexRequest createIndexRequest(
String index,
String docType,
XContentType contentType,
byte[] document) {
return new IndexRequest(index, docType)
.source(document, contentType);
}
@Override
public DeleteRequest createDeleteRequest(String index, String docType, String key) {
return new DeleteRequest(index, docType, key);
}
}
}