| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.connector.opensearch.table; |
| |
| import org.apache.flink.annotation.Internal; |
| import org.apache.flink.api.common.serialization.SerializationSchema; |
| import org.apache.flink.connector.opensearch.sink.FlushBackoffType; |
| import org.apache.flink.connector.opensearch.sink.Opensearch2Sink; |
| import org.apache.flink.connector.opensearch.sink.Opensearch2SinkBuilder; |
| import org.apache.flink.table.api.ValidationException; |
| import org.apache.flink.table.connector.ChangelogMode; |
| import org.apache.flink.table.connector.format.EncodingFormat; |
| import org.apache.flink.table.connector.sink.DynamicTableSink; |
| import org.apache.flink.table.connector.sink.SinkV2Provider; |
| import org.apache.flink.table.data.RowData; |
| import org.apache.flink.table.types.DataType; |
| import org.apache.flink.types.RowKind; |
| import org.apache.flink.util.StringUtils; |
| |
| import org.apache.http.HttpHost; |
| import org.opensearch.common.xcontent.XContentType; |
| |
| import java.time.ZoneId; |
| import java.util.List; |
| import java.util.Objects; |
| import java.util.function.Function; |
| |
| import static org.apache.flink.util.Preconditions.checkNotNull; |
| |
| /** |
| * A {@link DynamicTableSink} that describes how to create a {@link Opensearch2Sink} from a logical |
| * description. |
| */ |
| @Internal |
| class Opensearch2DynamicSink implements DynamicTableSink { |
| |
| final EncodingFormat<SerializationSchema<RowData>> format; |
| final DataType physicalRowDataType; |
| final List<LogicalTypeWithIndex> primaryKeyLogicalTypesWithIndex; |
| final OpensearchConfiguration config; |
| final ZoneId localTimeZoneId; |
| |
| final String summaryString; |
| final boolean isDynamicIndexWithSystemTime; |
| |
| Opensearch2DynamicSink( |
| EncodingFormat<SerializationSchema<RowData>> format, |
| OpensearchConfiguration config, |
| List<LogicalTypeWithIndex> primaryKeyLogicalTypesWithIndex, |
| DataType physicalRowDataType, |
| String summaryString, |
| ZoneId localTimeZoneId) { |
| this.format = checkNotNull(format); |
| this.physicalRowDataType = checkNotNull(physicalRowDataType); |
| this.primaryKeyLogicalTypesWithIndex = checkNotNull(primaryKeyLogicalTypesWithIndex); |
| this.config = checkNotNull(config); |
| this.summaryString = checkNotNull(summaryString); |
| this.localTimeZoneId = localTimeZoneId; |
| this.isDynamicIndexWithSystemTime = isDynamicIndexWithSystemTime(); |
| } |
| |
| public boolean isDynamicIndexWithSystemTime() { |
| IndexGeneratorFactory.IndexHelper indexHelper = new IndexGeneratorFactory.IndexHelper(); |
| return indexHelper.checkIsDynamicIndexWithSystemTimeFormat(config.getIndex()); |
| } |
| |
| Function<RowData, String> createKeyExtractor() { |
| return KeyExtractor.createKeyExtractor( |
| primaryKeyLogicalTypesWithIndex, config.getKeyDelimiter()); |
| } |
| |
| IndexGenerator createIndexGenerator() { |
| return IndexGeneratorFactory.createIndexGenerator( |
| config.getIndex(), |
| DataType.getFieldNames(physicalRowDataType), |
| DataType.getFieldDataTypes(physicalRowDataType), |
| localTimeZoneId); |
| } |
| |
| @Override |
| public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { |
| ChangelogMode.Builder builder = ChangelogMode.newBuilder(); |
| for (RowKind kind : requestedMode.getContainedKinds()) { |
| if (kind != RowKind.UPDATE_BEFORE) { |
| builder.addContainedKind(kind); |
| } |
| } |
| if (isDynamicIndexWithSystemTime && !requestedMode.containsOnly(RowKind.INSERT)) { |
| throw new ValidationException( |
| "Dynamic indexing based on system time only works on append only stream."); |
| } |
| return builder.build(); |
| } |
| |
| @Override |
| public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { |
| SerializationSchema<RowData> format = |
| this.format.createRuntimeEncoder(context, physicalRowDataType); |
| |
| final RowOpensearch2Emitter rowOpensearchEmitter = |
| new RowOpensearch2Emitter( |
| createIndexGenerator(), format, XContentType.JSON, createKeyExtractor()); |
| |
| final Opensearch2SinkBuilder<RowData> builder = new Opensearch2SinkBuilder<>(); |
| builder.setEmitter(rowOpensearchEmitter); |
| builder.setHosts(config.getHosts().toArray(new HttpHost[0])); |
| builder.setDeliveryGuarantee(config.getDeliveryGuarantee()); |
| builder.setBulkFlushMaxActions(config.getBulkFlushMaxActions()); |
| builder.setBulkFlushMaxSizeMb(config.getBulkFlushMaxByteSize().getMebiBytes()); |
| builder.setBulkFlushInterval(config.getBulkFlushInterval()); |
| |
| if (config.getBulkFlushBackoffType().isPresent()) { |
| FlushBackoffType backoffType = config.getBulkFlushBackoffType().get(); |
| int backoffMaxRetries = config.getBulkFlushBackoffRetries().get(); |
| long backoffDelayMs = config.getBulkFlushBackoffDelay().get(); |
| |
| builder.setBulkFlushBackoffStrategy(backoffType, backoffMaxRetries, backoffDelayMs); |
| } |
| |
| if (config.getUsername().isPresent() |
| && !StringUtils.isNullOrWhitespaceOnly(config.getUsername().get())) { |
| builder.setConnectionUsername(config.getUsername().get()); |
| } |
| |
| if (config.getPassword().isPresent() |
| && !StringUtils.isNullOrWhitespaceOnly(config.getPassword().get())) { |
| builder.setConnectionPassword(config.getPassword().get()); |
| } |
| |
| if (config.getPathPrefix().isPresent() |
| && !StringUtils.isNullOrWhitespaceOnly(config.getPathPrefix().get())) { |
| builder.setConnectionPathPrefix(config.getPathPrefix().get()); |
| } |
| |
| if (config.getConnectionRequestTimeout().isPresent()) { |
| builder.setConnectionRequestTimeout( |
| (int) config.getConnectionRequestTimeout().get().getSeconds()); |
| } |
| |
| if (config.getConnectionTimeout().isPresent()) { |
| builder.setConnectionTimeout((int) config.getConnectionTimeout().get().getSeconds()); |
| } |
| |
| if (config.getSocketTimeout().isPresent()) { |
| builder.setSocketTimeout((int) config.getSocketTimeout().get().getSeconds()); |
| } |
| |
| if (config.isAllowInsecure().isPresent()) { |
| builder.setAllowInsecure(config.isAllowInsecure().get()); |
| } |
| |
| return SinkV2Provider.of(builder.build(), config.getParallelism().orElse(null)); |
| } |
| |
| @Override |
| public DynamicTableSink copy() { |
| return new Opensearch2DynamicSink( |
| format, |
| config, |
| primaryKeyLogicalTypesWithIndex, |
| physicalRowDataType, |
| summaryString, |
| localTimeZoneId); |
| } |
| |
| @Override |
| public String asSummaryString() { |
| return summaryString; |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (this == o) { |
| return true; |
| } |
| if (o == null || getClass() != o.getClass()) { |
| return false; |
| } |
| Opensearch2DynamicSink that = (Opensearch2DynamicSink) o; |
| return Objects.equals(format, that.format) |
| && Objects.equals(physicalRowDataType, that.physicalRowDataType) |
| && Objects.equals( |
| primaryKeyLogicalTypesWithIndex, that.primaryKeyLogicalTypesWithIndex) |
| && Objects.equals(config, that.config) |
| && Objects.equals(summaryString, that.summaryString); |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hash( |
| format, |
| physicalRowDataType, |
| primaryKeyLogicalTypesWithIndex, |
| config, |
| summaryString); |
| } |
| } |