| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.iceberg.flink.sink; |
| |
| import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; |
| import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL; |
| import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; |
| import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; |
| import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; |
| import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; |
| import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE; |
| |
| import java.io.IOException; |
| import java.io.UncheckedIOException; |
| import java.time.Duration; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.function.Function; |
| import org.apache.flink.api.common.functions.MapFunction; |
| import org.apache.flink.api.common.typeinfo.TypeInformation; |
| import org.apache.flink.api.common.typeinfo.Types; |
| import org.apache.flink.configuration.Configuration; |
| import org.apache.flink.configuration.ReadableConfig; |
| import org.apache.flink.streaming.api.datastream.DataStream; |
| import org.apache.flink.streaming.api.datastream.DataStreamSink; |
| import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; |
| import org.apache.flink.streaming.api.functions.sink.DiscardingSink; |
| import org.apache.flink.table.api.TableSchema; |
| import org.apache.flink.table.data.RowData; |
| import org.apache.flink.table.data.util.DataFormatConverters; |
| import org.apache.flink.table.types.DataType; |
| import org.apache.flink.table.types.logical.RowType; |
| import org.apache.flink.types.Row; |
| import org.apache.iceberg.DataFile; |
| import org.apache.iceberg.DistributionMode; |
| import org.apache.iceberg.FileFormat; |
| import org.apache.iceberg.PartitionField; |
| import org.apache.iceberg.PartitionSpec; |
| import org.apache.iceberg.Schema; |
| import org.apache.iceberg.SerializableTable; |
| import org.apache.iceberg.Table; |
| import org.apache.iceberg.flink.FlinkSchemaUtil; |
| import org.apache.iceberg.flink.FlinkWriteConf; |
| import org.apache.iceberg.flink.FlinkWriteOptions; |
| import org.apache.iceberg.flink.TableLoader; |
| import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; |
| import org.apache.iceberg.io.WriteResult; |
| import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; |
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; |
| import org.apache.iceberg.relocated.com.google.common.collect.Lists; |
| import org.apache.iceberg.relocated.com.google.common.collect.Maps; |
| import org.apache.iceberg.relocated.com.google.common.collect.Sets; |
| import org.apache.iceberg.types.TypeUtil; |
| import org.apache.iceberg.util.SerializableSupplier; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class FlinkSink { |
| private static final Logger LOG = LoggerFactory.getLogger(FlinkSink.class); |
| |
| private static final String ICEBERG_STREAM_WRITER_NAME = |
| IcebergStreamWriter.class.getSimpleName(); |
| private static final String ICEBERG_FILES_COMMITTER_NAME = |
| IcebergFilesCommitter.class.getSimpleName(); |
| |
| private FlinkSink() {} |
| |
| /** |
| * Initialize a {@link Builder} to export the data from generic input data stream into iceberg |
| * table. We use {@link RowData} inside the sink connector, so users need to provide a mapper |
| * function and a {@link TypeInformation} to convert those generic records to a RowData |
| * DataStream. |
| * |
| * @param input the generic source input data stream. |
| * @param mapper function to convert the generic data to {@link RowData} |
| * @param outputType to define the {@link TypeInformation} for the input data. |
| * @param <T> the data type of records. |
| * @return {@link Builder} to connect the iceberg table. |
| */ |
| public static <T> Builder builderFor( |
| DataStream<T> input, MapFunction<T, RowData> mapper, TypeInformation<RowData> outputType) { |
| return new Builder().forMapperOutputType(input, mapper, outputType); |
| } |
| |
| /** |
| * Initialize a {@link Builder} to export the data from input data stream with {@link Row}s into |
| * iceberg table. We use {@link RowData} inside the sink connector, so users need to provide a |
| * {@link TableSchema} for builder to convert those {@link Row}s to a {@link RowData} DataStream. |
| * |
| * @param input the source input data stream with {@link Row}s. |
| * @param tableSchema defines the {@link TypeInformation} for input data. |
| * @return {@link Builder} to connect the iceberg table. |
| */ |
| public static Builder forRow(DataStream<Row> input, TableSchema tableSchema) { |
| RowType rowType = (RowType) tableSchema.toRowDataType().getLogicalType(); |
| DataType[] fieldDataTypes = tableSchema.getFieldDataTypes(); |
| |
| DataFormatConverters.RowConverter rowConverter = |
| new DataFormatConverters.RowConverter(fieldDataTypes); |
| return builderFor(input, rowConverter::toInternal, FlinkCompatibilityUtil.toTypeInfo(rowType)) |
| .tableSchema(tableSchema); |
| } |
| |
| /** |
| * Initialize a {@link Builder} to export the data from input data stream with {@link RowData}s |
| * into iceberg table. |
| * |
| * @param input the source input data stream with {@link RowData}s. |
| * @return {@link Builder} to connect the iceberg table. |
| */ |
| public static Builder forRowData(DataStream<RowData> input) { |
| return new Builder().forRowData(input); |
| } |
| |
| public static class Builder { |
| private Function<String, DataStream<RowData>> inputCreator = null; |
| private TableLoader tableLoader; |
| private Table table; |
| private TableSchema tableSchema; |
| private List<String> equalityFieldColumns = null; |
| private String uidPrefix = null; |
| private final Map<String, String> snapshotProperties = Maps.newHashMap(); |
| private ReadableConfig readableConfig = new Configuration(); |
| private final Map<String, String> writeOptions = Maps.newHashMap(); |
| private FlinkWriteConf flinkWriteConf = null; |
| |
| private Builder() {} |
| |
| private Builder forRowData(DataStream<RowData> newRowDataInput) { |
| this.inputCreator = ignored -> newRowDataInput; |
| return this; |
| } |
| |
| private <T> Builder forMapperOutputType( |
| DataStream<T> input, MapFunction<T, RowData> mapper, TypeInformation<RowData> outputType) { |
| this.inputCreator = |
| newUidPrefix -> { |
| // Input stream order is crucial for some situation(e.g. in cdc case). Therefore, we |
| // need to set the parallelism |
| // of map operator same as its input to keep map operator chaining its input, and avoid |
| // rebalanced by default. |
| SingleOutputStreamOperator<RowData> inputStream = |
| input.map(mapper, outputType).setParallelism(input.getParallelism()); |
| if (newUidPrefix != null) { |
| inputStream.name(operatorName(newUidPrefix)).uid(newUidPrefix + "-mapper"); |
| } |
| return inputStream; |
| }; |
| return this; |
| } |
| |
| /** |
| * This iceberg {@link Table} instance is used for initializing {@link IcebergStreamWriter} |
| * which will write all the records into {@link DataFile}s and emit them to downstream operator. |
| * Providing a table would avoid so many table loading from each separate task. |
| * |
| * @param newTable the loaded iceberg table instance. |
| * @return {@link Builder} to connect the iceberg table. |
| */ |
| public Builder table(Table newTable) { |
| this.table = newTable; |
| return this; |
| } |
| |
| /** |
| * The table loader is used for loading tables in {@link IcebergFilesCommitter} lazily, we need |
| * this loader because {@link Table} is not serializable and could not just use the loaded table |
| * from Builder#table in the remote task manager. |
| * |
| * @param newTableLoader to load iceberg table inside tasks. |
| * @return {@link Builder} to connect the iceberg table. |
| */ |
| public Builder tableLoader(TableLoader newTableLoader) { |
| this.tableLoader = newTableLoader; |
| return this; |
| } |
| |
| /** |
| * Set the write properties for Flink sink. View the supported properties in {@link |
| * FlinkWriteOptions} |
| */ |
| public Builder set(String property, String value) { |
| writeOptions.put(property, value); |
| return this; |
| } |
| |
| /** |
| * Set the write properties for Flink sink. View the supported properties in {@link |
| * FlinkWriteOptions} |
| */ |
| public Builder setAll(Map<String, String> properties) { |
| writeOptions.putAll(properties); |
| return this; |
| } |
| |
| public Builder tableSchema(TableSchema newTableSchema) { |
| this.tableSchema = newTableSchema; |
| return this; |
| } |
| |
| public Builder overwrite(boolean newOverwrite) { |
| writeOptions.put(FlinkWriteOptions.OVERWRITE_MODE.key(), Boolean.toString(newOverwrite)); |
| return this; |
| } |
| |
| public Builder flinkConf(ReadableConfig config) { |
| this.readableConfig = config; |
| return this; |
| } |
| |
| /** |
| * Configure the write {@link DistributionMode} that the flink sink will use. Currently, flink |
| * support {@link DistributionMode#NONE} and {@link DistributionMode#HASH}. |
| * |
| * @param mode to specify the write distribution mode. |
| * @return {@link Builder} to connect the iceberg table. |
| */ |
| public Builder distributionMode(DistributionMode mode) { |
| Preconditions.checkArgument( |
| !DistributionMode.RANGE.equals(mode), |
| "Flink does not support 'range' write distribution mode now."); |
| if (mode != null) { |
| writeOptions.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), mode.modeName()); |
| } |
| return this; |
| } |
| |
| /** |
| * Configuring the write parallel number for iceberg stream writer. |
| * |
| * @param newWriteParallelism the number of parallel iceberg stream writer. |
| * @return {@link Builder} to connect the iceberg table. |
| */ |
| public Builder writeParallelism(int newWriteParallelism) { |
| writeOptions.put( |
| FlinkWriteOptions.WRITE_PARALLELISM.key(), Integer.toString(newWriteParallelism)); |
| return this; |
| } |
| |
| /** |
| * All INSERT/UPDATE_AFTER events from input stream will be transformed to UPSERT events, which |
| * means it will DELETE the old records and then INSERT the new records. In partitioned table, |
| * the partition fields should be a subset of equality fields, otherwise the old row that |
| * located in partition-A could not be deleted by the new row that located in partition-B. |
| * |
| * @param enabled indicate whether it should transform all INSERT/UPDATE_AFTER events to UPSERT. |
| * @return {@link Builder} to connect the iceberg table. |
| */ |
| public Builder upsert(boolean enabled) { |
| writeOptions.put(FlinkWriteOptions.WRITE_UPSERT_ENABLED.key(), Boolean.toString(enabled)); |
| return this; |
| } |
| |
| /** |
| * Configuring the equality field columns for iceberg table that accept CDC or UPSERT events. |
| * |
| * @param columns defines the iceberg table's key. |
| * @return {@link Builder} to connect the iceberg table. |
| */ |
| public Builder equalityFieldColumns(List<String> columns) { |
| this.equalityFieldColumns = columns; |
| return this; |
| } |
| |
| /** |
| * Set the uid prefix for FlinkSink operators. Note that FlinkSink internally consists of |
| * multiple operators (like writer, committer, dummy sink etc.) Actually operator uid will be |
| * appended with a suffix like "uidPrefix-writer". <br> |
| * <br> |
| * If provided, this prefix is also applied to operator names. <br> |
| * <br> |
| * Flink auto generates operator uid if not set explicitly. It is a recommended <a |
| * href="https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/production_ready/"> |
| * best-practice to set uid for all operators</a> before deploying to production. Flink has an |
| * option to {@code pipeline.auto-generate-uid=false} to disable auto-generation and force |
| * explicit setting of all operator uid. <br> |
| * <br> |
| * Be careful with setting this for an existing job, because now we are changing the operator |
| * uid from an auto-generated one to this new value. When deploying the change with a |
| * checkpoint, Flink won't be able to restore the previous Flink sink operator state (more |
| * specifically the committer operator state). You need to use {@code --allowNonRestoredState} |
| * to ignore the previous sink state. During restore Flink sink state is used to check if last |
| * commit was actually successful or not. {@code --allowNonRestoredState} can lead to data loss |
| * if the Iceberg commit failed in the last completed checkpoint. |
| * |
| * @param newPrefix prefix for Flink sink operator uid and name |
| * @return {@link Builder} to connect the iceberg table. |
| */ |
| public Builder uidPrefix(String newPrefix) { |
| this.uidPrefix = newPrefix; |
| return this; |
| } |
| |
| public Builder setSnapshotProperties(Map<String, String> properties) { |
| snapshotProperties.putAll(properties); |
| return this; |
| } |
| |
| public Builder setSnapshotProperty(String property, String value) { |
| snapshotProperties.put(property, value); |
| return this; |
| } |
| |
| public Builder toBranch(String branch) { |
| writeOptions.put(FlinkWriteOptions.BRANCH.key(), branch); |
| return this; |
| } |
| |
| private <T> DataStreamSink<T> chainIcebergOperators() { |
| Preconditions.checkArgument( |
| inputCreator != null, |
| "Please use forRowData() or forMapperOutputType() to initialize the input DataStream."); |
| Preconditions.checkNotNull(tableLoader, "Table loader shouldn't be null"); |
| |
| DataStream<RowData> rowDataInput = inputCreator.apply(uidPrefix); |
| |
| if (table == null) { |
| if (!tableLoader.isOpen()) { |
| tableLoader.open(); |
| } |
| |
| try (TableLoader loader = tableLoader) { |
| this.table = loader.loadTable(); |
| } catch (IOException e) { |
| throw new UncheckedIOException( |
| "Failed to load iceberg table from table loader: " + tableLoader, e); |
| } |
| } |
| |
| flinkWriteConf = new FlinkWriteConf(table, writeOptions, readableConfig); |
| |
| // Find out the equality field id list based on the user-provided equality field column names. |
| List<Integer> equalityFieldIds = checkAndGetEqualityFieldIds(); |
| |
| // Convert the requested flink table schema to flink row type. |
| RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema); |
| |
| // Distribute the records from input data stream based on the write.distribution-mode and |
| // equality fields. |
| DataStream<RowData> distributeStream = |
| distributeDataStream( |
| rowDataInput, equalityFieldIds, table.spec(), table.schema(), flinkRowType); |
| |
| // Add parallel writers that append rows to files |
| SingleOutputStreamOperator<WriteResult> writerStream = |
| appendWriter(distributeStream, flinkRowType, equalityFieldIds); |
| |
| // Add single-parallelism committer that commits files |
| // after successful checkpoint or end of input |
| SingleOutputStreamOperator<Void> committerStream = appendCommitter(writerStream); |
| |
| // Add dummy discard sink |
| return appendDummySink(committerStream); |
| } |
| |
| /** |
| * Append the iceberg sink operators to write records to iceberg table. |
| * |
| * @return {@link DataStreamSink} for sink. |
| */ |
| public DataStreamSink<Void> append() { |
| return chainIcebergOperators(); |
| } |
| |
| private String operatorName(String suffix) { |
| return uidPrefix != null ? uidPrefix + "-" + suffix : suffix; |
| } |
| |
| @VisibleForTesting |
| List<Integer> checkAndGetEqualityFieldIds() { |
| List<Integer> equalityFieldIds = Lists.newArrayList(table.schema().identifierFieldIds()); |
| if (equalityFieldColumns != null && !equalityFieldColumns.isEmpty()) { |
| Set<Integer> equalityFieldSet = |
| Sets.newHashSetWithExpectedSize(equalityFieldColumns.size()); |
| for (String column : equalityFieldColumns) { |
| org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column); |
| Preconditions.checkNotNull( |
| field, |
| "Missing required equality field column '%s' in table schema %s", |
| column, |
| table.schema()); |
| equalityFieldSet.add(field.fieldId()); |
| } |
| |
| if (!equalityFieldSet.equals(table.schema().identifierFieldIds())) { |
| LOG.warn( |
| "The configured equality field column IDs {} are not matched with the schema identifier field IDs" |
| + " {}, use job specified equality field columns as the equality fields by default.", |
| equalityFieldSet, |
| table.schema().identifierFieldIds()); |
| } |
| equalityFieldIds = Lists.newArrayList(equalityFieldSet); |
| } |
| return equalityFieldIds; |
| } |
| |
| @SuppressWarnings("unchecked") |
| private <T> DataStreamSink<T> appendDummySink( |
| SingleOutputStreamOperator<Void> committerStream) { |
| DataStreamSink<T> resultStream = |
| committerStream |
| .addSink(new DiscardingSink()) |
| .name(operatorName(String.format("IcebergSink %s", this.table.name()))) |
| .setParallelism(1); |
| if (uidPrefix != null) { |
| resultStream = resultStream.uid(uidPrefix + "-dummysink"); |
| } |
| return resultStream; |
| } |
| |
| private SingleOutputStreamOperator<Void> appendCommitter( |
| SingleOutputStreamOperator<WriteResult> writerStream) { |
| IcebergFilesCommitter filesCommitter = |
| new IcebergFilesCommitter( |
| tableLoader, |
| flinkWriteConf.overwriteMode(), |
| snapshotProperties, |
| flinkWriteConf.workerPoolSize(), |
| flinkWriteConf.branch(), |
| table.spec()); |
| SingleOutputStreamOperator<Void> committerStream = |
| writerStream |
| .transform(operatorName(ICEBERG_FILES_COMMITTER_NAME), Types.VOID, filesCommitter) |
| .setParallelism(1) |
| .setMaxParallelism(1); |
| if (uidPrefix != null) { |
| committerStream = committerStream.uid(uidPrefix + "-committer"); |
| } |
| return committerStream; |
| } |
| |
| private SingleOutputStreamOperator<WriteResult> appendWriter( |
| DataStream<RowData> input, RowType flinkRowType, List<Integer> equalityFieldIds) { |
| // Validate the equality fields and partition fields if we enable the upsert mode. |
| if (flinkWriteConf.upsertMode()) { |
| Preconditions.checkState( |
| !flinkWriteConf.overwriteMode(), |
| "OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream."); |
| Preconditions.checkState( |
| !equalityFieldIds.isEmpty(), |
| "Equality field columns shouldn't be empty when configuring to use UPSERT data stream."); |
| if (!table.spec().isUnpartitioned()) { |
| for (PartitionField partitionField : table.spec().fields()) { |
| Preconditions.checkState( |
| equalityFieldIds.contains(partitionField.sourceId()), |
| "In UPSERT mode, partition field '%s' should be included in equality fields: '%s'", |
| partitionField, |
| equalityFieldColumns); |
| } |
| } |
| } |
| |
| SerializableTable serializableTable = (SerializableTable) SerializableTable.copyOf(table); |
| Duration tableRefreshInterval = flinkWriteConf.tableRefreshInterval(); |
| |
| SerializableSupplier<Table> tableSupplier; |
| if (tableRefreshInterval != null) { |
| tableSupplier = |
| new CachingTableSupplier(serializableTable, tableLoader, tableRefreshInterval); |
| } else { |
| tableSupplier = () -> serializableTable; |
| } |
| |
| IcebergStreamWriter<RowData> streamWriter = |
| createStreamWriter(tableSupplier, flinkWriteConf, flinkRowType, equalityFieldIds); |
| |
| int parallelism = |
| flinkWriteConf.writeParallelism() == null |
| ? input.getParallelism() |
| : flinkWriteConf.writeParallelism(); |
| SingleOutputStreamOperator<WriteResult> writerStream = |
| input |
| .transform( |
| operatorName(ICEBERG_STREAM_WRITER_NAME), |
| TypeInformation.of(WriteResult.class), |
| streamWriter) |
| .setParallelism(parallelism); |
| if (uidPrefix != null) { |
| writerStream = writerStream.uid(uidPrefix + "-writer"); |
| } |
| return writerStream; |
| } |
| |
| private DataStream<RowData> distributeDataStream( |
| DataStream<RowData> input, |
| List<Integer> equalityFieldIds, |
| PartitionSpec partitionSpec, |
| Schema iSchema, |
| RowType flinkRowType) { |
| DistributionMode writeMode = flinkWriteConf.distributionMode(); |
| |
| LOG.info("Write distribution mode is '{}'", writeMode.modeName()); |
| switch (writeMode) { |
| case NONE: |
| if (equalityFieldIds.isEmpty()) { |
| return input; |
| } else { |
| LOG.info("Distribute rows by equality fields, because there are equality fields set"); |
| return input.keyBy( |
| new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds)); |
| } |
| |
| case HASH: |
| if (equalityFieldIds.isEmpty()) { |
| if (partitionSpec.isUnpartitioned()) { |
| LOG.warn( |
| "Fallback to use 'none' distribution mode, because there are no equality fields set " |
| + "and table is unpartitioned"); |
| return input; |
| } else { |
| return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType)); |
| } |
| } else { |
| if (partitionSpec.isUnpartitioned()) { |
| LOG.info( |
| "Distribute rows by equality fields, because there are equality fields set " |
| + "and table is unpartitioned"); |
| return input.keyBy( |
| new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds)); |
| } else { |
| for (PartitionField partitionField : partitionSpec.fields()) { |
| Preconditions.checkState( |
| equalityFieldIds.contains(partitionField.sourceId()), |
| "In 'hash' distribution mode with equality fields set, partition field '%s' " |
| + "should be included in equality fields: '%s'", |
| partitionField, |
| equalityFieldColumns); |
| } |
| return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType)); |
| } |
| } |
| |
| case RANGE: |
| if (equalityFieldIds.isEmpty()) { |
| LOG.warn( |
| "Fallback to use 'none' distribution mode, because there are no equality fields set " |
| + "and {}=range is not supported yet in flink", |
| WRITE_DISTRIBUTION_MODE); |
| return input; |
| } else { |
| LOG.info( |
| "Distribute rows by equality fields, because there are equality fields set " |
| + "and{}=range is not supported yet in flink", |
| WRITE_DISTRIBUTION_MODE); |
| return input.keyBy( |
| new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds)); |
| } |
| |
| default: |
| throw new RuntimeException("Unrecognized " + WRITE_DISTRIBUTION_MODE + ": " + writeMode); |
| } |
| } |
| } |
| |
| static RowType toFlinkRowType(Schema schema, TableSchema requestedSchema) { |
| if (requestedSchema != null) { |
| // Convert the flink schema to iceberg schema firstly, then reassign ids to match the existing |
| // iceberg schema. |
| Schema writeSchema = TypeUtil.reassignIds(FlinkSchemaUtil.convert(requestedSchema), schema); |
| TypeUtil.validateWriteSchema(schema, writeSchema, true, true); |
| |
| // We use this flink schema to read values from RowData. The flink's TINYINT and SMALLINT will |
| // be promoted to |
| // iceberg INTEGER, that means if we use iceberg's table schema to read TINYINT (backend by 1 |
| // 'byte'), we will |
| // read 4 bytes rather than 1 byte, it will mess up the byte array in BinaryRowData. So here |
| // we must use flink |
| // schema. |
| return (RowType) requestedSchema.toRowDataType().getLogicalType(); |
| } else { |
| return FlinkSchemaUtil.convert(schema); |
| } |
| } |
| |
| static IcebergStreamWriter<RowData> createStreamWriter( |
| SerializableSupplier<Table> tableSupplier, |
| FlinkWriteConf flinkWriteConf, |
| RowType flinkRowType, |
| List<Integer> equalityFieldIds) { |
| Preconditions.checkArgument(tableSupplier != null, "Iceberg table supplier shouldn't be null"); |
| |
| Table initTable = tableSupplier.get(); |
| FileFormat format = flinkWriteConf.dataFileFormat(); |
| TaskWriterFactory<RowData> taskWriterFactory = |
| new RowDataTaskWriterFactory( |
| tableSupplier, |
| flinkRowType, |
| flinkWriteConf.targetDataFileSize(), |
| format, |
| writeProperties(initTable, format, flinkWriteConf), |
| equalityFieldIds, |
| flinkWriteConf.upsertMode()); |
| |
| return new IcebergStreamWriter<>(initTable.name(), taskWriterFactory); |
| } |
| |
| /** |
| * Based on the {@link FileFormat} overwrites the table level compression properties for the table |
| * write. |
| * |
| * @param table The table to get the table level settings |
| * @param format The FileFormat to use |
| * @param conf The write configuration |
| * @return The properties to use for writing |
| */ |
| private static Map<String, String> writeProperties( |
| Table table, FileFormat format, FlinkWriteConf conf) { |
| Map<String, String> writeProperties = Maps.newHashMap(table.properties()); |
| |
| switch (format) { |
| case PARQUET: |
| writeProperties.put(PARQUET_COMPRESSION, conf.parquetCompressionCodec()); |
| String parquetCompressionLevel = conf.parquetCompressionLevel(); |
| if (parquetCompressionLevel != null) { |
| writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); |
| } |
| |
| break; |
| case AVRO: |
| writeProperties.put(AVRO_COMPRESSION, conf.avroCompressionCodec()); |
| String avroCompressionLevel = conf.avroCompressionLevel(); |
| if (avroCompressionLevel != null) { |
| writeProperties.put(AVRO_COMPRESSION_LEVEL, conf.avroCompressionLevel()); |
| } |
| |
| break; |
| case ORC: |
| writeProperties.put(ORC_COMPRESSION, conf.orcCompressionCodec()); |
| writeProperties.put(ORC_COMPRESSION_STRATEGY, conf.orcCompressionStrategy()); |
| break; |
| default: |
| throw new IllegalArgumentException(String.format("Unknown file format %s", format)); |
| } |
| |
| return writeProperties; |
| } |
| } |