| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.streaming.connectors.elasticsearch; |
| |
| import org.apache.flink.annotation.Internal; |
| import org.apache.flink.api.common.functions.RuntimeContext; |
| import org.apache.flink.api.common.serialization.SerializationSchema; |
| import org.apache.flink.api.java.tuple.Tuple2; |
| import org.apache.flink.streaming.api.datastream.DataStream; |
| import org.apache.flink.streaming.api.datastream.DataStreamSink; |
| import org.apache.flink.streaming.api.functions.sink.SinkFunction; |
| import org.apache.flink.table.api.TableSchema; |
| import org.apache.flink.table.api.ValidationException; |
| import org.apache.flink.table.api.types.DataType; |
| import org.apache.flink.table.api.types.InternalType; |
| import org.apache.flink.table.sinks.TableSink; |
| import org.apache.flink.table.sinks.UpsertStreamTableSink; |
| import org.apache.flink.table.typeutils.TypeCheckUtils; |
| import org.apache.flink.table.util.TableConnectorUtil; |
| import org.apache.flink.table.util.TableSchemaUtil; |
| import org.apache.flink.types.Row; |
| import org.apache.flink.util.Preconditions; |
| |
| import org.elasticsearch.action.ActionRequest; |
| import org.elasticsearch.action.delete.DeleteRequest; |
| import org.elasticsearch.action.index.IndexRequest; |
| import org.elasticsearch.action.update.UpdateRequest; |
| import org.elasticsearch.common.xcontent.XContentType; |
| |
| import java.io.Serializable; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| |
| /** |
| * A version-agnostic Elasticsearch {@link UpsertStreamTableSink}. |
| */ |
| @Internal |
| public abstract class ElasticsearchUpsertTableSinkBase implements UpsertStreamTableSink<Row> { |
| |
| /** Flag that indicates that only inserts are accepted. */ |
| private final boolean isAppendOnly; |
| |
| /** Schema of the table. */ |
| private final TableSchema schema; |
| |
| /** Version-agnostic hosts configuration. */ |
| private final List<Host> hosts; |
| |
| /** Default index for all requests. */ |
| private final String index; |
| |
| /** Default document type for all requests. */ |
| private final String docType; |
| |
| /** Delimiter for composite keys. */ |
| private final String keyDelimiter; |
| |
| /** String literal for null keys. */ |
| private final String keyNullLiteral; |
| |
| /** Serialization schema used for the document. */ |
| private final SerializationSchema<Row> serializationSchema; |
| |
| /** Content type describing the serialization schema. */ |
| private final XContentType contentType; |
| |
| /** Failure handler for failing {@link ActionRequest}s. */ |
| private final ActionRequestFailureHandler failureHandler; |
| |
| /** |
| * Map of optional configuration parameters for the Elasticsearch sink. The config is |
| * internal and can change at any time. |
| */ |
| private final Map<SinkOption, String> sinkOptions; |
| |
| /** |
| * Version-agnostic creation of {@link ActionRequest}s. |
| */ |
| private final RequestFactory requestFactory; |
| |
| /** Key field indices determined by the query. */ |
| private int[] keyFieldIndices = new int[0]; |
| |
| public ElasticsearchUpsertTableSinkBase( |
| boolean isAppendOnly, |
| TableSchema schema, |
| List<Host> hosts, |
| String index, |
| String docType, |
| String keyDelimiter, |
| String keyNullLiteral, |
| SerializationSchema<Row> serializationSchema, |
| XContentType contentType, |
| ActionRequestFailureHandler failureHandler, |
| Map<SinkOption, String> sinkOptions, |
| RequestFactory requestFactory) { |
| |
| this.isAppendOnly = isAppendOnly; |
| this.schema = Preconditions.checkNotNull(schema); |
| this.hosts = Preconditions.checkNotNull(hosts); |
| this.index = Preconditions.checkNotNull(index); |
| this.keyDelimiter = Preconditions.checkNotNull(keyDelimiter); |
| this.keyNullLiteral = Preconditions.checkNotNull(keyNullLiteral); |
| this.docType = Preconditions.checkNotNull(docType); |
| this.serializationSchema = Preconditions.checkNotNull(serializationSchema); |
| this.contentType = Preconditions.checkNotNull(contentType); |
| this.failureHandler = Preconditions.checkNotNull(failureHandler); |
| this.sinkOptions = Preconditions.checkNotNull(sinkOptions); |
| this.requestFactory = Preconditions.checkNotNull(requestFactory); |
| } |
| |
| @Override |
| public void setKeyFields(String[] keyNames) { |
| if (keyNames == null) { |
| this.keyFieldIndices = new int[0]; |
| return; |
| } |
| |
| final String[] fieldNames = getFieldNames(); |
| final int[] keyFieldIndices = new int[keyNames.length]; |
| for (int i = 0; i < keyNames.length; i++) { |
| keyFieldIndices[i] = -1; |
| for (int j = 0; j < fieldNames.length; j++) { |
| if (keyNames[i].equals(fieldNames[j])) { |
| keyFieldIndices[i] = j; |
| break; |
| } |
| } |
| if (keyFieldIndices[i] == -1) { |
| throw new RuntimeException("Invalid key fields: " + Arrays.toString(keyNames)); |
| } |
| } |
| |
| validateKeyTypes(keyFieldIndices); |
| |
| this.keyFieldIndices = keyFieldIndices; |
| } |
| |
| @Override |
| public void setIsAppendOnly(Boolean isAppendOnly) { |
| if (this.isAppendOnly && !isAppendOnly) { |
| throw new ValidationException( |
| "The given query is not supported by this sink because the sink is configured to " + |
| "operate in append mode only. Thus, it only support insertions (no queries " + |
| "with updating results)."); |
| } |
| } |
| |
| @Override |
| public DataType getRecordType() { |
| return TableSchemaUtil.toRowType(schema); |
| } |
| |
| @Override |
| public DataStreamSink emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) { |
| final ElasticsearchUpsertSinkFunction upsertFunction = |
| new ElasticsearchUpsertSinkFunction( |
| index, |
| docType, |
| keyDelimiter, |
| keyNullLiteral, |
| serializationSchema, |
| contentType, |
| requestFactory, |
| keyFieldIndices); |
| final SinkFunction<Tuple2<Boolean, Row>> sinkFunction = createSinkFunction( |
| hosts, |
| failureHandler, |
| sinkOptions, |
| upsertFunction); |
| return dataStream.addSink(sinkFunction) |
| .name(TableConnectorUtil.generateRuntimeName(this.getClass(), getFieldNames())); |
| } |
| |
| @Override |
| public String[] getFieldNames() { |
| return schema.getColumnNames(); |
| } |
| |
| @Override |
| public DataType[] getFieldTypes() { |
| return schema.getTypes(); |
| } |
| |
| @Override |
| public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, DataType[] fieldTypes) { |
| if (!Arrays.equals(getFieldNames(), fieldNames) || !Arrays.equals(getFieldTypes(), fieldTypes)) { |
| throw new ValidationException("Reconfiguration with different fields is not allowed. " + |
| "Expected: " + Arrays.toString(getFieldNames()) + " / " + Arrays.toString(getFieldTypes()) + ". " + |
| "But was: " + Arrays.toString(fieldNames) + " / " + Arrays.toString(fieldTypes)); |
| } |
| return copy( |
| isAppendOnly, |
| schema, |
| hosts, |
| index, |
| docType, |
| keyDelimiter, |
| keyNullLiteral, |
| serializationSchema, |
| contentType, |
| failureHandler, |
| sinkOptions, |
| requestFactory); |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (this == o) { |
| return true; |
| } |
| if (o == null || getClass() != o.getClass()) { |
| return false; |
| } |
| ElasticsearchUpsertTableSinkBase that = (ElasticsearchUpsertTableSinkBase) o; |
| return Objects.equals(isAppendOnly, that.isAppendOnly) && |
| Objects.equals(schema, that.schema) && |
| Objects.equals(hosts, that.hosts) && |
| Objects.equals(index, that.index) && |
| Objects.equals(docType, that.docType) && |
| Objects.equals(keyDelimiter, that.keyDelimiter) && |
| Objects.equals(keyNullLiteral, that.keyNullLiteral) && |
| Objects.equals(serializationSchema, that.serializationSchema) && |
| Objects.equals(contentType, that.contentType) && |
| Objects.equals(failureHandler, that.failureHandler) && |
| Objects.equals(sinkOptions, that.sinkOptions); |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hash( |
| isAppendOnly, |
| schema, |
| hosts, |
| index, |
| docType, |
| keyDelimiter, |
| keyNullLiteral, |
| serializationSchema, |
| contentType, |
| failureHandler, |
| sinkOptions); |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| // For version-specific implementations |
| // -------------------------------------------------------------------------------------------- |
| |
| protected abstract ElasticsearchUpsertTableSinkBase copy( |
| boolean isAppendOnly, |
| TableSchema schema, |
| List<Host> hosts, |
| String index, |
| String docType, |
| String keyDelimiter, |
| String keyNullLiteral, |
| SerializationSchema<Row> serializationSchema, |
| XContentType contentType, |
| ActionRequestFailureHandler failureHandler, |
| Map<SinkOption, String> sinkOptions, |
| RequestFactory requestFactory); |
| |
| protected abstract SinkFunction<Tuple2<Boolean, Row>> createSinkFunction( |
| List<Host> hosts, |
| ActionRequestFailureHandler failureHandler, |
| Map<SinkOption, String> sinkOptions, |
| ElasticsearchUpsertSinkFunction upsertFunction); |
| |
| // -------------------------------------------------------------------------------------------- |
| // Helper methods |
| // -------------------------------------------------------------------------------------------- |
| |
| /** |
| * Validate the types that are used for conversion to string. |
| */ |
| private void validateKeyTypes(int[] keyFieldIndices) { |
| final DataType[] types = getFieldTypes(); |
| for (int keyFieldIndex : keyFieldIndices) { |
| final InternalType type = (InternalType) types[keyFieldIndex]; |
| if (!TypeCheckUtils.isSimpleStringRepresentation(type)) { |
| throw new ValidationException( |
| "Only simple types that can be safely converted into a string representation " + |
| "can be used as keys. But was: " + type); |
| } |
| } |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| // Helper classes |
| // -------------------------------------------------------------------------------------------- |
| |
| /** |
| * Keys for optional parameterization of the sink. |
| */ |
| public enum SinkOption { |
| DISABLE_FLUSH_ON_CHECKPOINT, |
| BULK_FLUSH_MAX_ACTIONS, |
| BULK_FLUSH_MAX_SIZE, |
| BULK_FLUSH_INTERVAL, |
| BULK_FLUSH_BACKOFF_ENABLED, |
| BULK_FLUSH_BACKOFF_TYPE, |
| BULK_FLUSH_BACKOFF_RETRIES, |
| BULK_FLUSH_BACKOFF_DELAY, |
| REST_MAX_RETRY_TIMEOUT, |
| REST_PATH_PREFIX |
| } |
| |
| /** |
| * Entity for describing a host of Elasticsearch. |
| */ |
| public static class Host { |
| public final String hostname; |
| public final int port; |
| public final String protocol; |
| |
| public Host(String hostname, int port, String protocol) { |
| this.hostname = hostname; |
| this.port = port; |
| this.protocol = protocol; |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (this == o) { |
| return true; |
| } |
| if (o == null || getClass() != o.getClass()) { |
| return false; |
| } |
| Host host = (Host) o; |
| return port == host.port && |
| Objects.equals(hostname, host.hostname) && |
| Objects.equals(protocol, host.protocol); |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hash( |
| hostname, |
| port, |
| protocol); |
| } |
| } |
| |
| /** |
| * For version-agnostic creating of {@link ActionRequest}s. |
| */ |
| public interface RequestFactory extends Serializable { |
| |
| /** |
| * Creates an update request to be added to a {@link RequestIndexer}. |
| */ |
| UpdateRequest createUpdateRequest( |
| String index, |
| String docType, |
| String key, |
| XContentType contentType, |
| byte[] document); |
| |
| /** |
| * Creates an index request to be added to a {@link RequestIndexer}. |
| */ |
| IndexRequest createIndexRequest( |
| String index, |
| String docType, |
| XContentType contentType, |
| byte[] document); |
| |
| /** |
| * Creates a delete request to be added to a {@link RequestIndexer}. |
| */ |
| DeleteRequest createDeleteRequest( |
| String index, |
| String docType, |
| String key); |
| } |
| |
| /** |
| * Sink function for converting upserts into Elasticsearch {@link ActionRequest}s. |
| */ |
| public static class ElasticsearchUpsertSinkFunction implements ElasticsearchSinkFunction<Tuple2<Boolean, Row>> { |
| |
| private final String index; |
| private final String docType; |
| private final String keyDelimiter; |
| private final String keyNullLiteral; |
| private final SerializationSchema<Row> serializationSchema; |
| private final XContentType contentType; |
| private final RequestFactory requestFactory; |
| private final int[] keyFieldIndices; |
| |
| public ElasticsearchUpsertSinkFunction( |
| String index, |
| String docType, |
| String keyDelimiter, |
| String keyNullLiteral, |
| SerializationSchema<Row> serializationSchema, |
| XContentType contentType, |
| RequestFactory requestFactory, |
| int[] keyFieldIndices) { |
| |
| this.index = Preconditions.checkNotNull(index); |
| this.docType = Preconditions.checkNotNull(docType); |
| this.keyDelimiter = Preconditions.checkNotNull(keyDelimiter); |
| this.serializationSchema = Preconditions.checkNotNull(serializationSchema); |
| this.contentType = Preconditions.checkNotNull(contentType); |
| this.keyFieldIndices = Preconditions.checkNotNull(keyFieldIndices); |
| this.requestFactory = Preconditions.checkNotNull(requestFactory); |
| this.keyNullLiteral = Preconditions.checkNotNull(keyNullLiteral); |
| } |
| |
| @Override |
| public void process(Tuple2<Boolean, Row> element, RuntimeContext ctx, RequestIndexer indexer) { |
| if (element.f0) { |
| processUpsert(element.f1, indexer); |
| } else { |
| processDelete(element.f1, indexer); |
| } |
| } |
| |
| private void processUpsert(Row row, RequestIndexer indexer) { |
| final byte[] document = serializationSchema.serialize(row); |
| if (keyFieldIndices.length == 0) { |
| final IndexRequest indexRequest = requestFactory.createIndexRequest( |
| index, |
| docType, |
| contentType, |
| document); |
| indexer.add(indexRequest); |
| } else { |
| final String key = createKey(row); |
| final UpdateRequest updateRequest = requestFactory.createUpdateRequest( |
| index, |
| docType, |
| key, |
| contentType, |
| document); |
| indexer.add(updateRequest); |
| } |
| } |
| |
| private void processDelete(Row row, RequestIndexer indexer) { |
| final String key = createKey(row); |
| final DeleteRequest deleteRequest = requestFactory.createDeleteRequest( |
| index, |
| docType, |
| key); |
| indexer.add(deleteRequest); |
| } |
| |
| private String createKey(Row row) { |
| final StringBuilder builder = new StringBuilder(); |
| for (int i = 0; i < keyFieldIndices.length; i++) { |
| final int keyFieldIndex = keyFieldIndices[i]; |
| if (i > 0) { |
| builder.append(keyDelimiter); |
| } |
| final Object value = row.getField(keyFieldIndex); |
| if (value == null) { |
| builder.append(keyNullLiteral); |
| } else { |
| builder.append(value.toString()); |
| } |
| } |
| return builder.toString(); |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (this == o) { |
| return true; |
| } |
| if (o == null || getClass() != o.getClass()) { |
| return false; |
| } |
| ElasticsearchUpsertSinkFunction that = (ElasticsearchUpsertSinkFunction) o; |
| return Objects.equals(index, that.index) && |
| Objects.equals(docType, that.docType) && |
| Objects.equals(keyDelimiter, that.keyDelimiter) && |
| Objects.equals(keyNullLiteral, that.keyNullLiteral) && |
| Objects.equals(serializationSchema, that.serializationSchema) && |
| contentType == that.contentType && |
| Objects.equals(requestFactory, that.requestFactory) && |
| Arrays.equals(keyFieldIndices, that.keyFieldIndices); |
| } |
| |
| @Override |
| public int hashCode() { |
| int result = Objects.hash( |
| index, |
| docType, |
| keyDelimiter, |
| keyNullLiteral, |
| serializationSchema, |
| contentType, |
| requestFactory); |
| result = 31 * result + Arrays.hashCode(keyFieldIndices); |
| return result; |
| } |
| } |
| } |