Flink: backport PR #10859 for range distribution (#10990)
diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java
index 7167859..d5eea67 100644
--- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java
+++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java
@@ -53,6 +53,10 @@
return new LongConfParser();
}
+ public DoubleConfParser doubleConf() {
+ return new DoubleConfParser();
+ }
+
public <E extends Enum<E>> EnumConfParser<E> enumConfParser(Class<E> enumClass) {
return new EnumConfParser<>(enumClass);
}
@@ -135,6 +139,29 @@
}
}
+ class DoubleConfParser extends ConfParser<DoubleConfParser, Double> {
+ private Double defaultValue;
+
+ @Override
+ protected DoubleConfParser self() {
+ return this;
+ }
+
+ public DoubleConfParser defaultValue(double value) {
+ this.defaultValue = value;
+ return self();
+ }
+
+ public double parse() {
+ Preconditions.checkArgument(defaultValue != null, "Default value cannot be null");
+ return parse(Double::parseDouble, defaultValue);
+ }
+
+ public Double parseOptional() {
+ return parse(Double::parseDouble, null);
+ }
+ }
+
class StringConfParser extends ConfParser<StringConfParser, String> {
private String defaultValue;
diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
index ca7b112..a31902d 100644
--- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
+++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
@@ -26,6 +26,7 @@
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.sink.shuffle.StatisticsType;
/**
* A class for common Iceberg configs for Flink writes.
@@ -167,6 +168,26 @@
return DistributionMode.fromName(modeName);
}
+ public StatisticsType rangeDistributionStatisticsType() {
+ String name =
+ confParser
+ .stringConf()
+ .option(FlinkWriteOptions.RANGE_DISTRIBUTION_STATISTICS_TYPE.key())
+ .flinkConfig(FlinkWriteOptions.RANGE_DISTRIBUTION_STATISTICS_TYPE)
+ .defaultValue(FlinkWriteOptions.RANGE_DISTRIBUTION_STATISTICS_TYPE.defaultValue())
+ .parse();
+ return StatisticsType.valueOf(name);
+ }
+
+ public double rangeDistributionSortKeyBaseWeight() {
+ return confParser
+ .doubleConf()
+ .option(FlinkWriteOptions.RANGE_DISTRIBUTION_SORT_KEY_BASE_WEIGHT.key())
+ .flinkConfig(FlinkWriteOptions.RANGE_DISTRIBUTION_SORT_KEY_BASE_WEIGHT)
+ .defaultValue(FlinkWriteOptions.RANGE_DISTRIBUTION_SORT_KEY_BASE_WEIGHT.defaultValue())
+ .parse();
+ }
+
public int workerPoolSize() {
return confParser
.intConf()
diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
index df73f2e..c352867 100644
--- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
+++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
@@ -23,6 +23,7 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.flink.sink.shuffle.StatisticsType;
/** Flink sink write options */
public class FlinkWriteOptions {
@@ -60,6 +61,19 @@
public static final ConfigOption<String> DISTRIBUTION_MODE =
ConfigOptions.key("distribution-mode").stringType().noDefaultValue();
+ public static final ConfigOption<String> RANGE_DISTRIBUTION_STATISTICS_TYPE =
+ ConfigOptions.key("range-distribution-statistics-type")
+ .stringType()
+ .defaultValue(StatisticsType.Auto.name())
+ .withDescription("Type of statistics collection: Auto, Map, Sketch");
+
+ public static final ConfigOption<Double> RANGE_DISTRIBUTION_SORT_KEY_BASE_WEIGHT =
+ ConfigOptions.key("range-distribution-sort-key-base-weight")
+ .doubleType()
+ .defaultValue(0.0d)
+ .withDescription(
+ "Base weight for every sort key relative to target weight per writer task");
+
// Branch to write to
public static final ConfigOption<String> BRANCH =
ConfigOptions.key("branch").stringType().defaultValue(SnapshotRef.MAIN_BRANCH);
diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index 769af7d..2256d1e 100644
--- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -53,13 +53,19 @@
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Partitioning;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.FlinkWriteConf;
import org.apache.iceberg.flink.FlinkWriteOptions;
import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.sink.shuffle.DataStatisticsOperatorFactory;
+import org.apache.iceberg.flink.sink.shuffle.RangePartitioner;
+import org.apache.iceberg.flink.sink.shuffle.StatisticsOrRecord;
+import org.apache.iceberg.flink.sink.shuffle.StatisticsType;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
@@ -233,9 +239,6 @@
* @return {@link Builder} to connect the iceberg table.
*/
public Builder distributionMode(DistributionMode mode) {
- Preconditions.checkArgument(
- !DistributionMode.RANGE.equals(mode),
- "Flink does not support 'range' write distribution mode now.");
if (mode != null) {
writeOptions.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), mode.modeName());
}
@@ -243,6 +246,62 @@
}
/**
+ * Range distribution needs to collect statistics about data distribution to properly shuffle
+ * the records in relatively balanced way. In general, low cardinality should use {@link
+ * StatisticsType#Map} and high cardinality should use {@link StatisticsType#Sketch} Refer to
+ * {@link StatisticsType} Javadoc for more details.
+ *
+ * <p>Default is {@link StatisticsType#Auto} where initially Map statistics is used. But if
+ * cardinality is higher than the threshold (currently 10K) as defined in {@code
+ * SketchUtil#OPERATOR_SKETCH_SWITCH_THRESHOLD}, statistics collection automatically switches to
+ * the sketch reservoir sampling.
+ *
+ * <p>Explicit set the statistics type if the default behavior doesn't work.
+ *
+ * @param type to specify the statistics type for range distribution.
+ * @return {@link Builder} to connect the iceberg table.
+ */
+ public Builder rangeDistributionStatisticsType(StatisticsType type) {
+ if (type != null) {
+ writeOptions.put(FlinkWriteOptions.RANGE_DISTRIBUTION_STATISTICS_TYPE.key(), type.name());
+ }
+ return this;
+ }
+
+ /**
+ * If sort order contains partition columns, each sort key would map to one partition and data
+ * file. This relative weight can avoid placing too many small files for sort keys with low
+ * traffic. It is a double value that defines the minimal weight for each sort key. `0.02` means
+ * each key has a base weight of `2%` of the targeted traffic weight per writer task.
+ *
+ * <p>E.g. the sink Iceberg table is partitioned daily by event time. Assume the data stream
+ * contains events from now up to 180 days ago. With event time, traffic weight distribution
+ * across different days typically has a long tail pattern. Current day contains the most
+ * traffic. The older days (long tail) contain less and less traffic. Assume writer parallelism
+ * is `10`. The total weight across all 180 days is `10,000`. Target traffic weight per writer
+ * task would be `1,000`. Assume the weight sum for the oldest 150 days is `1,000`. Normally,
+ * the range partitioner would put all the oldest 150 days in one writer task. That writer task
+ * would write to 150 small files (one per day). Keeping 150 open files can potentially consume
+ * large amount of memory. Flushing and uploading 150 files (however small) at checkpoint time
+ * can also be potentially slow. If this config is set to `0.02`. It means every sort key has a
+ * base weight of `2%` of targeted weight of `1,000` for every write task. It would essentially
+ * avoid placing more than `50` data files (one per day) on one writer task no matter how small
+ * they are.
+ *
+ * <p>This is only applicable to {@link StatisticsType#Map} for low-cardinality scenario. For
+ * {@link StatisticsType#Sketch} high-cardinality sort columns, they are usually not used as
+ * partition columns. Otherwise, too many partitions and small files may be generated during
+ * write. Sketch range partitioner simply splits high-cardinality keys into ordered ranges.
+ *
+ * <p>Default is {@code 0.0%}.
+ */
+ public Builder rangeDistributionSortKeyBaseWeight(double weight) {
+ writeOptions.put(
+ FlinkWriteOptions.RANGE_DISTRIBUTION_SORT_KEY_BASE_WEIGHT.key(), Double.toString(weight));
+ return this;
+ }
+
+ /**
* Configuring the write parallel number for iceberg stream writer.
*
* @param newWriteParallelism the number of parallel iceberg stream writer.
@@ -349,18 +408,20 @@
// Find out the equality field id list based on the user-provided equality field column names.
List<Integer> equalityFieldIds = checkAndGetEqualityFieldIds();
- // Convert the requested flink table schema to flink row type.
RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
+ int writerParallelism =
+ flinkWriteConf.writeParallelism() == null
+ ? rowDataInput.getParallelism()
+ : flinkWriteConf.writeParallelism();
// Distribute the records from input data stream based on the write.distribution-mode and
// equality fields.
DataStream<RowData> distributeStream =
- distributeDataStream(
- rowDataInput, equalityFieldIds, table.spec(), table.schema(), flinkRowType);
+ distributeDataStream(rowDataInput, equalityFieldIds, flinkRowType, writerParallelism);
// Add parallel writers that append rows to files
SingleOutputStreamOperator<WriteResult> writerStream =
- appendWriter(distributeStream, flinkRowType, equalityFieldIds);
+ appendWriter(distributeStream, flinkRowType, equalityFieldIds, writerParallelism);
// Add single-parallelism committer that commits files
// after successful checkpoint or end of input
@@ -447,7 +508,10 @@
}
private SingleOutputStreamOperator<WriteResult> appendWriter(
- DataStream<RowData> input, RowType flinkRowType, List<Integer> equalityFieldIds) {
+ DataStream<RowData> input,
+ RowType flinkRowType,
+ List<Integer> equalityFieldIds,
+ int writerParallelism) {
// Validate the equality fields and partition fields if we enable the upsert mode.
if (flinkWriteConf.upsertMode()) {
Preconditions.checkState(
@@ -481,17 +545,13 @@
IcebergStreamWriter<RowData> streamWriter =
createStreamWriter(tableSupplier, flinkWriteConf, flinkRowType, equalityFieldIds);
- int parallelism =
- flinkWriteConf.writeParallelism() == null
- ? input.getParallelism()
- : flinkWriteConf.writeParallelism();
SingleOutputStreamOperator<WriteResult> writerStream =
input
.transform(
operatorName(ICEBERG_STREAM_WRITER_NAME),
TypeInformation.of(WriteResult.class),
streamWriter)
- .setParallelism(parallelism);
+ .setParallelism(writerParallelism);
if (uidPrefix != null) {
writerStream = writerStream.uid(uidPrefix + "-writer");
}
@@ -501,12 +561,15 @@
private DataStream<RowData> distributeDataStream(
DataStream<RowData> input,
List<Integer> equalityFieldIds,
- PartitionSpec partitionSpec,
- Schema iSchema,
- RowType flinkRowType) {
+ RowType flinkRowType,
+ int writerParallelism) {
DistributionMode writeMode = flinkWriteConf.distributionMode();
-
LOG.info("Write distribution mode is '{}'", writeMode.modeName());
+
+ Schema iSchema = table.schema();
+ PartitionSpec partitionSpec = table.spec();
+ SortOrder sortOrder = table.sortOrder();
+
switch (writeMode) {
case NONE:
if (equalityFieldIds.isEmpty()) {
@@ -548,21 +611,52 @@
}
case RANGE:
- if (equalityFieldIds.isEmpty()) {
+ // Ideally, exception should be thrown in the combination of range distribution and
+ // equality fields. Primary key case should use hash distribution mode.
+ // Keep the current behavior of falling back to keyBy for backward compatibility.
+ if (!equalityFieldIds.isEmpty()) {
LOG.warn(
- "Fallback to use 'none' distribution mode, because there are no equality fields set "
- + "and {}=range is not supported yet in flink",
- WRITE_DISTRIBUTION_MODE);
- return input;
- } else {
- LOG.info(
- "Distribute rows by equality fields, because there are equality fields set "
- + "and{}=range is not supported yet in flink",
+ "Hash distribute rows by equality fields, even though {}=range is set. "
+ + "Range distribution for primary keys are not always safe in "
+ + "Flink streaming writer.",
WRITE_DISTRIBUTION_MODE);
return input.keyBy(
new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds));
}
+ // range distribute by partition key or sort key if table has an SortOrder
+ Preconditions.checkState(
+ sortOrder.isSorted() || partitionSpec.isPartitioned(),
+ "Invalid write distribution mode: range. Need to define sort order or partition spec.");
+ if (sortOrder.isUnsorted()) {
+ sortOrder = Partitioning.sortOrderFor(partitionSpec);
+ LOG.info("Construct sort order from partition spec");
+ }
+
+ LOG.info("Range distribute rows by sort order: {}", sortOrder);
+ StatisticsType statisticsType = flinkWriteConf.rangeDistributionStatisticsType();
+ SingleOutputStreamOperator<StatisticsOrRecord> shuffleStream =
+ input
+ .transform(
+ operatorName("range-shuffle"),
+ TypeInformation.of(StatisticsOrRecord.class),
+ new DataStatisticsOperatorFactory(
+ iSchema,
+ sortOrder,
+ writerParallelism,
+ statisticsType,
+ flinkWriteConf.rangeDistributionSortKeyBaseWeight()))
+ // Set the parallelism same as input operator to encourage chaining
+ .setParallelism(input.getParallelism());
+ if (uidPrefix != null) {
+ shuffleStream = shuffleStream.uid(uidPrefix + "-shuffle");
+ }
+
+ return shuffleStream
+ .partitionCustom(new RangePartitioner(iSchema, sortOrder), r -> r)
+ .filter(StatisticsOrRecord::hasRecord)
+ .map(StatisticsOrRecord::record);
+
default:
throw new RuntimeException("Unrecognized " + WRITE_DISTRIBUTION_MODE + ": " + writeMode);
}
@@ -577,12 +671,9 @@
TypeUtil.validateWriteSchema(schema, writeSchema, true, true);
// We use this flink schema to read values from RowData. The flink's TINYINT and SMALLINT will
- // be promoted to
- // iceberg INTEGER, that means if we use iceberg's table schema to read TINYINT (backend by 1
- // 'byte'), we will
- // read 4 bytes rather than 1 byte, it will mess up the byte array in BinaryRowData. So here
- // we must use flink
- // schema.
+ // be promoted to iceberg INTEGER, that means if we use iceberg's table schema to read TINYINT
+ // (backend by 1 'byte'), we will read 4 bytes rather than 1 byte, it will mess up the
+ // byte array in BinaryRowData. So here we must use flink schema.
return (RowType) requestedSchema.toRowDataType().getLogicalType();
} else {
return FlinkSchemaUtil.convert(schema);
diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperatorFactory.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperatorFactory.java
new file mode 100644
index 0000000..dc147bf
--- /dev/null
+++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperatorFactory.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
+
+@Internal
+public class DataStatisticsOperatorFactory extends AbstractStreamOperatorFactory<StatisticsOrRecord>
+ implements CoordinatedOperatorFactory<StatisticsOrRecord>,
+ OneInputStreamOperatorFactory<RowData, StatisticsOrRecord> {
+
+ private final Schema schema;
+ private final SortOrder sortOrder;
+ private final int downstreamParallelism;
+ private final StatisticsType type;
+ private final double closeFileCostWeightPercentage;
+
+ public DataStatisticsOperatorFactory(
+ Schema schema,
+ SortOrder sortOrder,
+ int downstreamParallelism,
+ StatisticsType type,
+ double closeFileCostWeightPercentage) {
+ this.schema = schema;
+ this.sortOrder = sortOrder;
+ this.downstreamParallelism = downstreamParallelism;
+ this.type = type;
+ this.closeFileCostWeightPercentage = closeFileCostWeightPercentage;
+ }
+
+ @Override
+ public OperatorCoordinator.Provider getCoordinatorProvider(
+ String operatorName, OperatorID operatorID) {
+ return new DataStatisticsCoordinatorProvider(
+ operatorName,
+ operatorID,
+ schema,
+ sortOrder,
+ downstreamParallelism,
+ type,
+ closeFileCostWeightPercentage);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T extends StreamOperator<StatisticsOrRecord>> T createStreamOperator(
+ StreamOperatorParameters<StatisticsOrRecord> parameters) {
+ OperatorID operatorId = parameters.getStreamConfig().getOperatorID();
+ String operatorName = parameters.getStreamConfig().getOperatorName();
+ OperatorEventGateway gateway =
+ parameters.getOperatorEventDispatcher().getOperatorEventGateway(operatorId);
+
+ DataStatisticsOperator rangeStatisticsOperator =
+ new DataStatisticsOperator(
+ operatorName, schema, sortOrder, gateway, downstreamParallelism, type);
+
+ rangeStatisticsOperator.setup(
+ parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
+ parameters
+ .getOperatorEventDispatcher()
+ .registerEventHandler(operatorId, rangeStatisticsOperator);
+
+ return (T) rangeStatisticsOperator;
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
+ return DataStatisticsOperator.class;
+ }
+}
diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java
index 75e397d..df8c3c7 100644
--- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java
+++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java
@@ -20,28 +20,37 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assumptions.assumeThat;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.types.Row;
+import org.apache.iceberg.DataFile;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.flink.FlinkWriteOptions;
import org.apache.iceberg.flink.HadoopCatalogExtension;
import org.apache.iceberg.flink.MiniFlinkClusterExtension;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.sink.shuffle.StatisticsType;
+import org.apache.iceberg.flink.source.BoundedTestSource;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -177,4 +186,309 @@
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid distribution mode: UNRECOGNIZED");
}
+
+ @TestTemplate
+ public void testRangeDistributionWithoutSortOrderUnpartitioned() throws Exception {
+ assumeThat(partitioned).isFalse();
+
+ table
+ .updateProperties()
+ .set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName())
+ .commit();
+
+ int numOfCheckpoints = 6;
+ DataStream<Row> dataStream =
+ env.addSource(
+ createRangeDistributionBoundedSource(createCharRows(numOfCheckpoints, 10)),
+ ROW_TYPE_INFO);
+ FlinkSink.Builder builder =
+ FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+ .table(table)
+ .tableLoader(tableLoader)
+ .writeParallelism(parallelism);
+
+ // Range distribution requires either sort order or partition spec defined
+ assertThatThrownBy(builder::append)
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage(
+ "Invalid write distribution mode: range. Need to define sort order or partition spec.");
+ }
+
+ @TestTemplate
+ public void testRangeDistributionWithoutSortOrderPartitioned() throws Exception {
+ assumeThat(partitioned).isTrue();
+
+ table
+ .updateProperties()
+ .set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName())
+ .commit();
+
+ int numOfCheckpoints = 6;
+ DataStream<Row> dataStream =
+ env.addSource(
+ createRangeDistributionBoundedSource(createCharRows(numOfCheckpoints, 10)),
+ ROW_TYPE_INFO);
+ FlinkSink.Builder builder =
+ FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+ .table(table)
+ .tableLoader(tableLoader)
+ .writeParallelism(parallelism);
+
+ // sort based on partition columns
+ builder.append();
+ env.execute(getClass().getSimpleName());
+
+ table.refresh();
+ // ordered in reverse timeline from the newest snapshot to the oldest snapshot
+ List<Snapshot> snapshots = Lists.newArrayList(table.snapshots().iterator());
+ // only keep the snapshots with added data files
+ snapshots =
+ snapshots.stream()
+ .filter(snapshot -> snapshot.addedDataFiles(table.io()).iterator().hasNext())
+ .collect(Collectors.toList());
+
+ // Sometimes we will have more checkpoints than the bounded source if we pass the
+ // auto checkpoint interval. Thus producing multiple snapshots.
+ assertThat(snapshots).hasSizeGreaterThanOrEqualTo(numOfCheckpoints);
+ }
+
+ @TestTemplate
+ public void testRangeDistributionWithSortOrder() throws Exception {
+ table
+ .updateProperties()
+ .set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName())
+ .commit();
+ table.replaceSortOrder().asc("data").commit();
+
+ int numOfCheckpoints = 6;
+ DataStream<Row> dataStream =
+ env.addSource(
+ createRangeDistributionBoundedSource(createCharRows(numOfCheckpoints, 10)),
+ ROW_TYPE_INFO);
+ FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+ .table(table)
+ .tableLoader(tableLoader)
+ .writeParallelism(parallelism)
+ .rangeDistributionStatisticsType(StatisticsType.Map)
+ .append();
+ env.execute(getClass().getSimpleName());
+
+ table.refresh();
+ // ordered in reverse timeline from the newest snapshot to the oldest snapshot
+ List<Snapshot> snapshots = Lists.newArrayList(table.snapshots().iterator());
+ // only keep the snapshots with added data files
+ snapshots =
+ snapshots.stream()
+ .filter(snapshot -> snapshot.addedDataFiles(table.io()).iterator().hasNext())
+ .collect(Collectors.toList());
+
+ // Sometimes we will have more checkpoints than the bounded source if we pass the
+ // auto checkpoint interval. Thus producing multiple snapshots.
+ assertThat(snapshots).hasSizeGreaterThanOrEqualTo(numOfCheckpoints);
+
+ // It takes 2 checkpoint cycle for statistics collection and application
+ // of the globally aggregated statistics in the range partitioner.
+ // The last two checkpoints should have range shuffle applied
+ List<Snapshot> rangePartitionedCycles =
+ snapshots.subList(snapshots.size() - 2, snapshots.size());
+
+ if (partitioned) {
+ for (Snapshot snapshot : rangePartitionedCycles) {
+ List<DataFile> addedDataFiles =
+ Lists.newArrayList(snapshot.addedDataFiles(table.io()).iterator());
+ // up to 26 partitions
+ assertThat(addedDataFiles).hasSizeLessThanOrEqualTo(26);
+ }
+ } else {
+ for (Snapshot snapshot : rangePartitionedCycles) {
+ List<DataFile> addedDataFiles =
+ Lists.newArrayList(snapshot.addedDataFiles(table.io()).iterator());
+ // each writer task should only write one file for non-partition sort column
+ assertThat(addedDataFiles).hasSize(parallelism);
+ // verify there is no overlap in min-max stats range
+ if (parallelism == 2) {
+ assertIdColumnStatsNoRangeOverlap(addedDataFiles.get(0), addedDataFiles.get(1));
+ }
+ }
+ }
+ }
+
+ @TestTemplate
+ public void testRangeDistributionSketchWithSortOrder() throws Exception {
+ table
+ .updateProperties()
+ .set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName())
+ .commit();
+ table.replaceSortOrder().asc("id").commit();
+
+ int numOfCheckpoints = 6;
+ DataStream<Row> dataStream =
+ env.addSource(
+ createRangeDistributionBoundedSource(createIntRows(numOfCheckpoints, 1_000)),
+ ROW_TYPE_INFO);
+ FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+ .table(table)
+ .tableLoader(tableLoader)
+ .writeParallelism(parallelism)
+ .rangeDistributionStatisticsType(StatisticsType.Sketch)
+ .append();
+ env.execute(getClass().getSimpleName());
+
+ table.refresh();
+ // ordered in reverse timeline from the newest snapshot to the oldest snapshot
+ List<Snapshot> snapshots = Lists.newArrayList(table.snapshots().iterator());
+ // only keep the snapshots with added data files
+ snapshots =
+ snapshots.stream()
+ .filter(snapshot -> snapshot.addedDataFiles(table.io()).iterator().hasNext())
+ .collect(Collectors.toList());
+
+ // Sometimes we will have more checkpoints than the bounded source if we pass the
+ // auto checkpoint interval. Thus producing multiple snapshots.
+ assertThat(snapshots).hasSizeGreaterThanOrEqualTo(numOfCheckpoints);
+
+ // It takes 2 checkpoint cycle for statistics collection and application
+ // of the globally aggregated statistics in the range partitioner.
+ // The last two checkpoints should have range shuffle applied
+ List<Snapshot> rangePartitionedCycles =
+ snapshots.subList(snapshots.size() - 2, snapshots.size());
+
+ // since the input has a single value for the data column,
+ // it is always the same partition. Hence there is no difference
+ // for partitioned or not
+ for (Snapshot snapshot : rangePartitionedCycles) {
+ List<DataFile> addedDataFiles =
+ Lists.newArrayList(snapshot.addedDataFiles(table.io()).iterator());
+ // each writer task should only write one file for non-partition sort column
+ assertThat(addedDataFiles).hasSize(parallelism);
+ // verify there is no overlap in min-max stats range
+ if (parallelism == 2) {
+ assertIdColumnStatsNoRangeOverlap(addedDataFiles.get(0), addedDataFiles.get(1));
+ }
+ }
+ }
+
+ /** Test migration from Map stats to Sketch stats */
+ @TestTemplate
+ public void testRangeDistributionStatisticsMigration() throws Exception {
+ table
+ .updateProperties()
+ .set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName())
+ .commit();
+ table.replaceSortOrder().asc("id").commit();
+
+ int numOfCheckpoints = 4;
+ List<List<Row>> rowsPerCheckpoint = Lists.newArrayListWithCapacity(numOfCheckpoints);
+ for (int checkpointId = 0; checkpointId < numOfCheckpoints; ++checkpointId) {
+ // checkpointId 2 would emit 11_000 records which is larger than
+ // the OPERATOR_SKETCH_SWITCH_THRESHOLD of 10_000.
+ // This should trigger the stats migration.
+ int maxId = checkpointId < 1 ? 1_000 : 11_000;
+ List<Row> rows = Lists.newArrayListWithCapacity(maxId);
+ for (int j = 0; j < maxId; ++j) {
+ // fixed value "a" for the data (possible partition column)
+ rows.add(Row.of(j, "a"));
+ }
+
+ rowsPerCheckpoint.add(rows);
+ }
+
+ DataStream<Row> dataStream =
+ env.addSource(createRangeDistributionBoundedSource(rowsPerCheckpoint), ROW_TYPE_INFO);
+ FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+ .table(table)
+ .tableLoader(tableLoader)
+ .writeParallelism(parallelism)
+ .rangeDistributionStatisticsType(StatisticsType.Auto)
+ .append();
+ env.execute(getClass().getSimpleName());
+
+ table.refresh();
+ // ordered in reverse timeline from the newest snapshot to the oldest snapshot
+ List<Snapshot> snapshots = Lists.newArrayList(table.snapshots().iterator());
+ // only keep the snapshots with added data files
+ snapshots =
+ snapshots.stream()
+ .filter(snapshot -> snapshot.addedDataFiles(table.io()).iterator().hasNext())
+ .collect(Collectors.toList());
+
+ // Sometimes we will have more checkpoints than the bounded source if we pass the
+ // auto checkpoint interval. Thus producing multiple snapshots.
+ assertThat(snapshots).hasSizeGreaterThanOrEqualTo(numOfCheckpoints);
+
+ // It takes 2 checkpoint cycle for statistics collection and application
+ // of the globally aggregated statistics in the range partitioner.
+ // The last two checkpoints should have range shuffle applied
+ List<Snapshot> rangePartitionedCycles =
+ snapshots.subList(snapshots.size() - 2, snapshots.size());
+
+ // since the input has a single value for the data column,
+ // it is always the same partition. Hence there is no difference
+ // for partitioned or not
+ for (Snapshot snapshot : rangePartitionedCycles) {
+ List<DataFile> addedDataFiles =
+ Lists.newArrayList(snapshot.addedDataFiles(table.io()).iterator());
+ // each writer task should only write one file for non-partition sort column
+ // sometimes
+ assertThat(addedDataFiles).hasSize(parallelism);
+ // verify there is no overlap in min-max stats range
+ if (parallelism == 2) {
+ assertIdColumnStatsNoRangeOverlap(addedDataFiles.get(0), addedDataFiles.get(1));
+ }
+ }
+ }
+
+ private BoundedTestSource<Row> createRangeDistributionBoundedSource(
+ List<List<Row>> rowsPerCheckpoint) {
+ return new BoundedTestSource<>(rowsPerCheckpoint);
+ }
+
+ private List<List<Row>> createCharRows(int numOfCheckpoints, int countPerChar) {
+ List<List<Row>> rowsPerCheckpoint = Lists.newArrayListWithCapacity(numOfCheckpoints);
+ for (int checkpointId = 0; checkpointId < numOfCheckpoints; ++checkpointId) {
+ List<Row> rows = Lists.newArrayListWithCapacity(26 * countPerChar);
+ for (int j = 0; j < countPerChar; ++j) {
+ for (char c = 'a'; c <= 'z'; ++c) {
+ rows.add(Row.of(1, String.valueOf(c)));
+ }
+ }
+
+ rowsPerCheckpoint.add(rows);
+ }
+
+ return rowsPerCheckpoint;
+ }
+
+ private List<List<Row>> createIntRows(int numOfCheckpoints, int maxId) {
+ List<List<Row>> rowsPerCheckpoint = Lists.newArrayListWithCapacity(numOfCheckpoints);
+ for (int checkpointId = 0; checkpointId < numOfCheckpoints; ++checkpointId) {
+ List<Row> rows = Lists.newArrayListWithCapacity(maxId);
+ for (int j = 0; j < maxId; ++j) {
+ // fixed value "a" for the data (possible partition column)
+ rows.add(Row.of(j, "a"));
+ }
+
+ rowsPerCheckpoint.add(rows);
+ }
+
+ return rowsPerCheckpoint;
+ }
+
+ private void assertIdColumnStatsNoRangeOverlap(DataFile file1, DataFile file2) {
+ // id column has fieldId 1
+ int file1LowerBound =
+ Conversions.fromByteBuffer(Types.IntegerType.get(), file1.lowerBounds().get(1));
+ int file1UpperBound =
+ Conversions.fromByteBuffer(Types.IntegerType.get(), file1.upperBounds().get(1));
+ int file2LowerBound =
+ Conversions.fromByteBuffer(Types.IntegerType.get(), file2.lowerBounds().get(1));
+ int file2UpperBound =
+ Conversions.fromByteBuffer(Types.IntegerType.get(), file2.upperBounds().get(1));
+
+ if (file1LowerBound < file2LowerBound) {
+ assertThat(file1UpperBound).isLessThanOrEqualTo(file2LowerBound);
+ } else {
+ assertThat(file2UpperBound).isLessThanOrEqualTo(file1LowerBound);
+ }
+ }
}
diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
index 577c549..b283b83 100644
--- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
+++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
@@ -30,6 +30,7 @@
import org.apache.flink.types.Row;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.ParameterizedTestExtension;
@@ -184,11 +185,21 @@
.hasMessage(
"OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");
- assertThatThrownBy(
- () -> builder.equalityFieldColumns(ImmutableList.of()).overwrite(false).append())
- .isInstanceOf(IllegalStateException.class)
- .hasMessage(
- "Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
+ if (writeDistributionMode.equals(DistributionMode.RANGE.modeName()) && !partitioned) {
+ // validation error thrown from distributeDataStream
+ assertThatThrownBy(
+ () -> builder.equalityFieldColumns(ImmutableList.of()).overwrite(false).append())
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage(
+ "Invalid write distribution mode: range. Need to define sort order or partition spec.");
+ } else {
+ // validation error thrown from appendWriter
+ assertThatThrownBy(
+ () -> builder.equalityFieldColumns(ImmutableList.of()).overwrite(false).append())
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage(
+ "Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
+ }
}
@TestTemplate
diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java
index 7167859..d5eea67 100644
--- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java
+++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java
@@ -53,6 +53,10 @@
return new LongConfParser();
}
+ public DoubleConfParser doubleConf() {
+ return new DoubleConfParser();
+ }
+
public <E extends Enum<E>> EnumConfParser<E> enumConfParser(Class<E> enumClass) {
return new EnumConfParser<>(enumClass);
}
@@ -135,6 +139,29 @@
}
}
+ class DoubleConfParser extends ConfParser<DoubleConfParser, Double> {
+ private Double defaultValue;
+
+ @Override
+ protected DoubleConfParser self() {
+ return this;
+ }
+
+ public DoubleConfParser defaultValue(double value) {
+ this.defaultValue = value;
+ return self();
+ }
+
+ public double parse() {
+ Preconditions.checkArgument(defaultValue != null, "Default value cannot be null");
+ return parse(Double::parseDouble, defaultValue);
+ }
+
+ public Double parseOptional() {
+ return parse(Double::parseDouble, null);
+ }
+ }
+
class StringConfParser extends ConfParser<StringConfParser, String> {
private String defaultValue;
diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
index ca7b112..a31902d 100644
--- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
+++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
@@ -26,6 +26,7 @@
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.sink.shuffle.StatisticsType;
/**
* A class for common Iceberg configs for Flink writes.
@@ -167,6 +168,26 @@
return DistributionMode.fromName(modeName);
}
+ public StatisticsType rangeDistributionStatisticsType() {
+ String name =
+ confParser
+ .stringConf()
+ .option(FlinkWriteOptions.RANGE_DISTRIBUTION_STATISTICS_TYPE.key())
+ .flinkConfig(FlinkWriteOptions.RANGE_DISTRIBUTION_STATISTICS_TYPE)
+ .defaultValue(FlinkWriteOptions.RANGE_DISTRIBUTION_STATISTICS_TYPE.defaultValue())
+ .parse();
+ return StatisticsType.valueOf(name);
+ }
+
+ public double rangeDistributionSortKeyBaseWeight() {
+ return confParser
+ .doubleConf()
+ .option(FlinkWriteOptions.RANGE_DISTRIBUTION_SORT_KEY_BASE_WEIGHT.key())
+ .flinkConfig(FlinkWriteOptions.RANGE_DISTRIBUTION_SORT_KEY_BASE_WEIGHT)
+ .defaultValue(FlinkWriteOptions.RANGE_DISTRIBUTION_SORT_KEY_BASE_WEIGHT.defaultValue())
+ .parse();
+ }
+
public int workerPoolSize() {
return confParser
.intConf()
diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
index df73f2e..c352867 100644
--- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
+++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
@@ -23,6 +23,7 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.flink.sink.shuffle.StatisticsType;
/** Flink sink write options */
public class FlinkWriteOptions {
@@ -60,6 +61,19 @@
public static final ConfigOption<String> DISTRIBUTION_MODE =
ConfigOptions.key("distribution-mode").stringType().noDefaultValue();
+ public static final ConfigOption<String> RANGE_DISTRIBUTION_STATISTICS_TYPE =
+ ConfigOptions.key("range-distribution-statistics-type")
+ .stringType()
+ .defaultValue(StatisticsType.Auto.name())
+ .withDescription("Type of statistics collection: Auto, Map, Sketch");
+
+ public static final ConfigOption<Double> RANGE_DISTRIBUTION_SORT_KEY_BASE_WEIGHT =
+ ConfigOptions.key("range-distribution-sort-key-base-weight")
+ .doubleType()
+ .defaultValue(0.0d)
+ .withDescription(
+ "Base weight for every sort key relative to target weight per writer task");
+
// Branch to write to
public static final ConfigOption<String> BRANCH =
ConfigOptions.key("branch").stringType().defaultValue(SnapshotRef.MAIN_BRANCH);
diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index 769af7d..2256d1e 100644
--- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -53,13 +53,19 @@
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Partitioning;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.FlinkWriteConf;
import org.apache.iceberg.flink.FlinkWriteOptions;
import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.sink.shuffle.DataStatisticsOperatorFactory;
+import org.apache.iceberg.flink.sink.shuffle.RangePartitioner;
+import org.apache.iceberg.flink.sink.shuffle.StatisticsOrRecord;
+import org.apache.iceberg.flink.sink.shuffle.StatisticsType;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
@@ -233,9 +239,6 @@
* @return {@link Builder} to connect the iceberg table.
*/
public Builder distributionMode(DistributionMode mode) {
- Preconditions.checkArgument(
- !DistributionMode.RANGE.equals(mode),
- "Flink does not support 'range' write distribution mode now.");
if (mode != null) {
writeOptions.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), mode.modeName());
}
@@ -243,6 +246,62 @@
}
/**
+ * Range distribution needs to collect statistics about data distribution to properly shuffle
+ * the records in relatively balanced way. In general, low cardinality should use {@link
+ * StatisticsType#Map} and high cardinality should use {@link StatisticsType#Sketch} Refer to
+ * {@link StatisticsType} Javadoc for more details.
+ *
+ * <p>Default is {@link StatisticsType#Auto} where initially Map statistics is used. But if
+ * cardinality is higher than the threshold (currently 10K) as defined in {@code
+ * SketchUtil#OPERATOR_SKETCH_SWITCH_THRESHOLD}, statistics collection automatically switches to
+ * the sketch reservoir sampling.
+ *
+ * <p>Explicit set the statistics type if the default behavior doesn't work.
+ *
+ * @param type to specify the statistics type for range distribution.
+ * @return {@link Builder} to connect the iceberg table.
+ */
+ public Builder rangeDistributionStatisticsType(StatisticsType type) {
+ if (type != null) {
+ writeOptions.put(FlinkWriteOptions.RANGE_DISTRIBUTION_STATISTICS_TYPE.key(), type.name());
+ }
+ return this;
+ }
+
+ /**
+ * If sort order contains partition columns, each sort key would map to one partition and data
+ * file. This relative weight can avoid placing too many small files for sort keys with low
+ * traffic. It is a double value that defines the minimal weight for each sort key. `0.02` means
+ * each key has a base weight of `2%` of the targeted traffic weight per writer task.
+ *
+ * <p>E.g. the sink Iceberg table is partitioned daily by event time. Assume the data stream
+ * contains events from now up to 180 days ago. With event time, traffic weight distribution
+ * across different days typically has a long tail pattern. Current day contains the most
+ * traffic. The older days (long tail) contain less and less traffic. Assume writer parallelism
+ * is `10`. The total weight across all 180 days is `10,000`. Target traffic weight per writer
+ * task would be `1,000`. Assume the weight sum for the oldest 150 days is `1,000`. Normally,
+ * the range partitioner would put all the oldest 150 days in one writer task. That writer task
+ * would write to 150 small files (one per day). Keeping 150 open files can potentially consume
+ * large amount of memory. Flushing and uploading 150 files (however small) at checkpoint time
+ * can also be potentially slow. If this config is set to `0.02`. It means every sort key has a
+ * base weight of `2%` of targeted weight of `1,000` for every write task. It would essentially
+ * avoid placing more than `50` data files (one per day) on one writer task no matter how small
+ * they are.
+ *
+ * <p>This is only applicable to {@link StatisticsType#Map} for low-cardinality scenario. For
+ * {@link StatisticsType#Sketch} high-cardinality sort columns, they are usually not used as
+ * partition columns. Otherwise, too many partitions and small files may be generated during
+ * write. Sketch range partitioner simply splits high-cardinality keys into ordered ranges.
+ *
+ * <p>Default is {@code 0.0%}.
+ */
+ public Builder rangeDistributionSortKeyBaseWeight(double weight) {
+ writeOptions.put(
+ FlinkWriteOptions.RANGE_DISTRIBUTION_SORT_KEY_BASE_WEIGHT.key(), Double.toString(weight));
+ return this;
+ }
+
+ /**
* Configuring the write parallel number for iceberg stream writer.
*
* @param newWriteParallelism the number of parallel iceberg stream writer.
@@ -349,18 +408,20 @@
// Find out the equality field id list based on the user-provided equality field column names.
List<Integer> equalityFieldIds = checkAndGetEqualityFieldIds();
- // Convert the requested flink table schema to flink row type.
RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
+ int writerParallelism =
+ flinkWriteConf.writeParallelism() == null
+ ? rowDataInput.getParallelism()
+ : flinkWriteConf.writeParallelism();
// Distribute the records from input data stream based on the write.distribution-mode and
// equality fields.
DataStream<RowData> distributeStream =
- distributeDataStream(
- rowDataInput, equalityFieldIds, table.spec(), table.schema(), flinkRowType);
+ distributeDataStream(rowDataInput, equalityFieldIds, flinkRowType, writerParallelism);
// Add parallel writers that append rows to files
SingleOutputStreamOperator<WriteResult> writerStream =
- appendWriter(distributeStream, flinkRowType, equalityFieldIds);
+ appendWriter(distributeStream, flinkRowType, equalityFieldIds, writerParallelism);
// Add single-parallelism committer that commits files
// after successful checkpoint or end of input
@@ -447,7 +508,10 @@
}
private SingleOutputStreamOperator<WriteResult> appendWriter(
- DataStream<RowData> input, RowType flinkRowType, List<Integer> equalityFieldIds) {
+ DataStream<RowData> input,
+ RowType flinkRowType,
+ List<Integer> equalityFieldIds,
+ int writerParallelism) {
// Validate the equality fields and partition fields if we enable the upsert mode.
if (flinkWriteConf.upsertMode()) {
Preconditions.checkState(
@@ -481,17 +545,13 @@
IcebergStreamWriter<RowData> streamWriter =
createStreamWriter(tableSupplier, flinkWriteConf, flinkRowType, equalityFieldIds);
- int parallelism =
- flinkWriteConf.writeParallelism() == null
- ? input.getParallelism()
- : flinkWriteConf.writeParallelism();
SingleOutputStreamOperator<WriteResult> writerStream =
input
.transform(
operatorName(ICEBERG_STREAM_WRITER_NAME),
TypeInformation.of(WriteResult.class),
streamWriter)
- .setParallelism(parallelism);
+ .setParallelism(writerParallelism);
if (uidPrefix != null) {
writerStream = writerStream.uid(uidPrefix + "-writer");
}
@@ -501,12 +561,15 @@
private DataStream<RowData> distributeDataStream(
DataStream<RowData> input,
List<Integer> equalityFieldIds,
- PartitionSpec partitionSpec,
- Schema iSchema,
- RowType flinkRowType) {
+ RowType flinkRowType,
+ int writerParallelism) {
DistributionMode writeMode = flinkWriteConf.distributionMode();
-
LOG.info("Write distribution mode is '{}'", writeMode.modeName());
+
+ Schema iSchema = table.schema();
+ PartitionSpec partitionSpec = table.spec();
+ SortOrder sortOrder = table.sortOrder();
+
switch (writeMode) {
case NONE:
if (equalityFieldIds.isEmpty()) {
@@ -548,21 +611,52 @@
}
case RANGE:
- if (equalityFieldIds.isEmpty()) {
+ // Ideally, exception should be thrown in the combination of range distribution and
+ // equality fields. Primary key case should use hash distribution mode.
+ // Keep the current behavior of falling back to keyBy for backward compatibility.
+ if (!equalityFieldIds.isEmpty()) {
LOG.warn(
- "Fallback to use 'none' distribution mode, because there are no equality fields set "
- + "and {}=range is not supported yet in flink",
- WRITE_DISTRIBUTION_MODE);
- return input;
- } else {
- LOG.info(
- "Distribute rows by equality fields, because there are equality fields set "
- + "and{}=range is not supported yet in flink",
+ "Hash distribute rows by equality fields, even though {}=range is set. "
+ + "Range distribution for primary keys are not always safe in "
+ + "Flink streaming writer.",
WRITE_DISTRIBUTION_MODE);
return input.keyBy(
new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds));
}
+ // range distribute by partition key or sort key if table has an SortOrder
+ Preconditions.checkState(
+ sortOrder.isSorted() || partitionSpec.isPartitioned(),
+ "Invalid write distribution mode: range. Need to define sort order or partition spec.");
+ if (sortOrder.isUnsorted()) {
+ sortOrder = Partitioning.sortOrderFor(partitionSpec);
+ LOG.info("Construct sort order from partition spec");
+ }
+
+ LOG.info("Range distribute rows by sort order: {}", sortOrder);
+ StatisticsType statisticsType = flinkWriteConf.rangeDistributionStatisticsType();
+ SingleOutputStreamOperator<StatisticsOrRecord> shuffleStream =
+ input
+ .transform(
+ operatorName("range-shuffle"),
+ TypeInformation.of(StatisticsOrRecord.class),
+ new DataStatisticsOperatorFactory(
+ iSchema,
+ sortOrder,
+ writerParallelism,
+ statisticsType,
+ flinkWriteConf.rangeDistributionSortKeyBaseWeight()))
+ // Set the parallelism same as input operator to encourage chaining
+ .setParallelism(input.getParallelism());
+ if (uidPrefix != null) {
+ shuffleStream = shuffleStream.uid(uidPrefix + "-shuffle");
+ }
+
+ return shuffleStream
+ .partitionCustom(new RangePartitioner(iSchema, sortOrder), r -> r)
+ .filter(StatisticsOrRecord::hasRecord)
+ .map(StatisticsOrRecord::record);
+
default:
throw new RuntimeException("Unrecognized " + WRITE_DISTRIBUTION_MODE + ": " + writeMode);
}
@@ -577,12 +671,9 @@
TypeUtil.validateWriteSchema(schema, writeSchema, true, true);
// We use this flink schema to read values from RowData. The flink's TINYINT and SMALLINT will
- // be promoted to
- // iceberg INTEGER, that means if we use iceberg's table schema to read TINYINT (backend by 1
- // 'byte'), we will
- // read 4 bytes rather than 1 byte, it will mess up the byte array in BinaryRowData. So here
- // we must use flink
- // schema.
+ // be promoted to iceberg INTEGER, that means if we use iceberg's table schema to read TINYINT
+ // (backend by 1 'byte'), we will read 4 bytes rather than 1 byte, it will mess up the
+ // byte array in BinaryRowData. So here we must use flink schema.
return (RowType) requestedSchema.toRowDataType().getLogicalType();
} else {
return FlinkSchemaUtil.convert(schema);
diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperatorFactory.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperatorFactory.java
new file mode 100644
index 0000000..dc147bf
--- /dev/null
+++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperatorFactory.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
+
+@Internal
+public class DataStatisticsOperatorFactory extends AbstractStreamOperatorFactory<StatisticsOrRecord>
+ implements CoordinatedOperatorFactory<StatisticsOrRecord>,
+ OneInputStreamOperatorFactory<RowData, StatisticsOrRecord> {
+
+ private final Schema schema;
+ private final SortOrder sortOrder;
+ private final int downstreamParallelism;
+ private final StatisticsType type;
+ private final double closeFileCostWeightPercentage;
+
+ public DataStatisticsOperatorFactory(
+ Schema schema,
+ SortOrder sortOrder,
+ int downstreamParallelism,
+ StatisticsType type,
+ double closeFileCostWeightPercentage) {
+ this.schema = schema;
+ this.sortOrder = sortOrder;
+ this.downstreamParallelism = downstreamParallelism;
+ this.type = type;
+ this.closeFileCostWeightPercentage = closeFileCostWeightPercentage;
+ }
+
+ @Override
+ public OperatorCoordinator.Provider getCoordinatorProvider(
+ String operatorName, OperatorID operatorID) {
+ return new DataStatisticsCoordinatorProvider(
+ operatorName,
+ operatorID,
+ schema,
+ sortOrder,
+ downstreamParallelism,
+ type,
+ closeFileCostWeightPercentage);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T extends StreamOperator<StatisticsOrRecord>> T createStreamOperator(
+ StreamOperatorParameters<StatisticsOrRecord> parameters) {
+ OperatorID operatorId = parameters.getStreamConfig().getOperatorID();
+ String operatorName = parameters.getStreamConfig().getOperatorName();
+ OperatorEventGateway gateway =
+ parameters.getOperatorEventDispatcher().getOperatorEventGateway(operatorId);
+
+ DataStatisticsOperator rangeStatisticsOperator =
+ new DataStatisticsOperator(
+ operatorName, schema, sortOrder, gateway, downstreamParallelism, type);
+
+ rangeStatisticsOperator.setup(
+ parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
+ parameters
+ .getOperatorEventDispatcher()
+ .registerEventHandler(operatorId, rangeStatisticsOperator);
+
+ return (T) rangeStatisticsOperator;
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
+ return DataStatisticsOperator.class;
+ }
+}
diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java
index 482cfd1..b63547d 100644
--- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java
+++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java
@@ -20,6 +20,7 @@
import static org.apache.iceberg.flink.FlinkCatalogFactory.ICEBERG_CATALOG_TYPE_HADOOP;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
import java.io.File;
import java.util.Arrays;
@@ -46,6 +47,7 @@
import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
+import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Namespace;
@@ -54,6 +56,7 @@
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -241,4 +244,93 @@
sql("DROP TABLE IF EXISTS %s.%s", FLINK_DATABASE, tableName);
}
}
+
+ @TestTemplate
+ public void testRangeDistributionPartitionColumn() {
+ // Range partitioner currently only works with streaming writes (with checkpoints)
+ assumeThat(isStreamingJob).isTrue();
+
+ // Initialize a BoundedSource table to precisely emit those rows in only one checkpoint.
+ List<List<Row>> rowsPerCheckpoint =
+ IntStream.range(1, 6)
+ .mapToObj(
+ checkpointId -> {
+ List<Row> charRows = Lists.newArrayList();
+ // emit 26x10 rows for each checkpoint cycle
+ for (int i = 0; i < 10; ++i) {
+ for (char c = 'a'; c <= 'z'; c++) {
+ charRows.add(Row.of(c - 'a', String.valueOf(c)));
+ }
+ }
+ return charRows;
+ })
+ .collect(Collectors.toList());
+ List<Row> flattenedRows =
+ rowsPerCheckpoint.stream().flatMap(List::stream).collect(Collectors.toList());
+
+ String dataId = BoundedTableFactory.registerDataSet(rowsPerCheckpoint);
+ sql(
+ "CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)"
+ + " WITH ('connector'='BoundedSource', 'data-id'='%s')",
+ SOURCE_TABLE, dataId);
+
+ assertThat(sql("SELECT * FROM %s", SOURCE_TABLE))
+ .as("Should have the expected rows in source table.")
+ .containsExactlyInAnyOrderElementsOf(flattenedRows);
+
+ Map<String, String> tableProps =
+ ImmutableMap.of(
+ "write.format.default",
+ FileFormat.PARQUET.name(),
+ TableProperties.WRITE_DISTRIBUTION_MODE,
+ DistributionMode.RANGE.modeName());
+
+ String tableName = "test_hash_distribution_mode";
+ sql(
+ "CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH %s",
+ tableName, toWithClause(tableProps));
+
+ try {
+ // Insert data set.
+ sql("INSERT INTO %s SELECT * FROM %s", tableName, SOURCE_TABLE);
+
+ assertThat(sql("SELECT * FROM %s", tableName))
+ .as("Should have the expected rows in sink table.")
+ .containsExactlyInAnyOrderElementsOf(flattenedRows);
+
+ Table table = catalog.loadTable(TableIdentifier.of(ICEBERG_NAMESPACE, tableName));
+ // ordered in reverse timeline from the newest snapshot to the oldest snapshot
+ List<Snapshot> snapshots = Lists.newArrayList(table.snapshots().iterator());
+ // only keep the snapshots with added data files
+ snapshots =
+ snapshots.stream()
+ .filter(snapshot -> snapshot.addedDataFiles(table.io()).iterator().hasNext())
+ .collect(Collectors.toList());
+
+ // Sometimes we will have more checkpoints than the bounded source if we pass the
+ // auto checkpoint interval. Thus producing multiple snapshots.
+ assertThat(snapshots).hasSizeGreaterThanOrEqualTo(5);
+
+ // It takes 2 checkpoint cycle for statistics collection and application
+ // of the globally aggregated statistics in the range partitioner.
+ // The last two checkpoints should have range shuffle applied
+ List<Snapshot> rangePartitionedCycles =
+ snapshots.subList(snapshots.size() - 2, snapshots.size());
+
+ for (Snapshot snapshot : rangePartitionedCycles) {
+ List<DataFile> addedDataFiles =
+ Lists.newArrayList(snapshot.addedDataFiles(table.io()).iterator());
+ // range partition results in each partition only assigned to one writer task
+ // maybe less than 26 partitions as BoundedSource doesn't always precisely
+ // control the checkpoint boundary.
+ // It is hard to precisely control the test condition in SQL tests.
+ // Here only minimal safe assertions are applied to avoid flakiness.
+ // If there are no shuffling, the number of data files could be as high as
+ // 26 * 4 as the default parallelism is set to 4 for the mini cluster.
+ assertThat(addedDataFiles).hasSizeLessThanOrEqualTo(26);
+ }
+ } finally {
+ sql("DROP TABLE IF EXISTS %s.%s", FLINK_DATABASE, tableName);
+ }
+ }
}
diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java
index 75e397d..df8c3c7 100644
--- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java
+++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java
@@ -20,28 +20,37 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assumptions.assumeThat;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.types.Row;
+import org.apache.iceberg.DataFile;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.flink.FlinkWriteOptions;
import org.apache.iceberg.flink.HadoopCatalogExtension;
import org.apache.iceberg.flink.MiniFlinkClusterExtension;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.sink.shuffle.StatisticsType;
+import org.apache.iceberg.flink.source.BoundedTestSource;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -177,4 +186,309 @@
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid distribution mode: UNRECOGNIZED");
}
+
+ @TestTemplate
+ public void testRangeDistributionWithoutSortOrderUnpartitioned() throws Exception {
+ assumeThat(partitioned).isFalse();
+
+ table
+ .updateProperties()
+ .set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName())
+ .commit();
+
+ int numOfCheckpoints = 6;
+ DataStream<Row> dataStream =
+ env.addSource(
+ createRangeDistributionBoundedSource(createCharRows(numOfCheckpoints, 10)),
+ ROW_TYPE_INFO);
+ FlinkSink.Builder builder =
+ FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+ .table(table)
+ .tableLoader(tableLoader)
+ .writeParallelism(parallelism);
+
+ // Range distribution requires either sort order or partition spec defined
+ assertThatThrownBy(builder::append)
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage(
+ "Invalid write distribution mode: range. Need to define sort order or partition spec.");
+ }
+
+ @TestTemplate
+ public void testRangeDistributionWithoutSortOrderPartitioned() throws Exception {
+ assumeThat(partitioned).isTrue();
+
+ table
+ .updateProperties()
+ .set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName())
+ .commit();
+
+ int numOfCheckpoints = 6;
+ DataStream<Row> dataStream =
+ env.addSource(
+ createRangeDistributionBoundedSource(createCharRows(numOfCheckpoints, 10)),
+ ROW_TYPE_INFO);
+ FlinkSink.Builder builder =
+ FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+ .table(table)
+ .tableLoader(tableLoader)
+ .writeParallelism(parallelism);
+
+ // sort based on partition columns
+ builder.append();
+ env.execute(getClass().getSimpleName());
+
+ table.refresh();
+ // ordered in reverse timeline from the newest snapshot to the oldest snapshot
+ List<Snapshot> snapshots = Lists.newArrayList(table.snapshots().iterator());
+ // only keep the snapshots with added data files
+ snapshots =
+ snapshots.stream()
+ .filter(snapshot -> snapshot.addedDataFiles(table.io()).iterator().hasNext())
+ .collect(Collectors.toList());
+
+ // Sometimes we will have more checkpoints than the bounded source if we pass the
+ // auto checkpoint interval. Thus producing multiple snapshots.
+ assertThat(snapshots).hasSizeGreaterThanOrEqualTo(numOfCheckpoints);
+ }
+
+ @TestTemplate
+ public void testRangeDistributionWithSortOrder() throws Exception {
+ table
+ .updateProperties()
+ .set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName())
+ .commit();
+ table.replaceSortOrder().asc("data").commit();
+
+ int numOfCheckpoints = 6;
+ DataStream<Row> dataStream =
+ env.addSource(
+ createRangeDistributionBoundedSource(createCharRows(numOfCheckpoints, 10)),
+ ROW_TYPE_INFO);
+ FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+ .table(table)
+ .tableLoader(tableLoader)
+ .writeParallelism(parallelism)
+ .rangeDistributionStatisticsType(StatisticsType.Map)
+ .append();
+ env.execute(getClass().getSimpleName());
+
+ table.refresh();
+ // ordered in reverse timeline from the newest snapshot to the oldest snapshot
+ List<Snapshot> snapshots = Lists.newArrayList(table.snapshots().iterator());
+ // only keep the snapshots with added data files
+ snapshots =
+ snapshots.stream()
+ .filter(snapshot -> snapshot.addedDataFiles(table.io()).iterator().hasNext())
+ .collect(Collectors.toList());
+
+ // Sometimes we will have more checkpoints than the bounded source if we pass the
+ // auto checkpoint interval. Thus producing multiple snapshots.
+ assertThat(snapshots).hasSizeGreaterThanOrEqualTo(numOfCheckpoints);
+
+ // It takes 2 checkpoint cycle for statistics collection and application
+ // of the globally aggregated statistics in the range partitioner.
+ // The last two checkpoints should have range shuffle applied
+ List<Snapshot> rangePartitionedCycles =
+ snapshots.subList(snapshots.size() - 2, snapshots.size());
+
+ if (partitioned) {
+ for (Snapshot snapshot : rangePartitionedCycles) {
+ List<DataFile> addedDataFiles =
+ Lists.newArrayList(snapshot.addedDataFiles(table.io()).iterator());
+ // up to 26 partitions
+ assertThat(addedDataFiles).hasSizeLessThanOrEqualTo(26);
+ }
+ } else {
+ for (Snapshot snapshot : rangePartitionedCycles) {
+ List<DataFile> addedDataFiles =
+ Lists.newArrayList(snapshot.addedDataFiles(table.io()).iterator());
+ // each writer task should only write one file for non-partition sort column
+ assertThat(addedDataFiles).hasSize(parallelism);
+ // verify there is no overlap in min-max stats range
+ if (parallelism == 2) {
+ assertIdColumnStatsNoRangeOverlap(addedDataFiles.get(0), addedDataFiles.get(1));
+ }
+ }
+ }
+ }
+
+ @TestTemplate
+ public void testRangeDistributionSketchWithSortOrder() throws Exception {
+ table
+ .updateProperties()
+ .set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName())
+ .commit();
+ table.replaceSortOrder().asc("id").commit();
+
+ int numOfCheckpoints = 6;
+ DataStream<Row> dataStream =
+ env.addSource(
+ createRangeDistributionBoundedSource(createIntRows(numOfCheckpoints, 1_000)),
+ ROW_TYPE_INFO);
+ FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+ .table(table)
+ .tableLoader(tableLoader)
+ .writeParallelism(parallelism)
+ .rangeDistributionStatisticsType(StatisticsType.Sketch)
+ .append();
+ env.execute(getClass().getSimpleName());
+
+ table.refresh();
+ // ordered in reverse timeline from the newest snapshot to the oldest snapshot
+ List<Snapshot> snapshots = Lists.newArrayList(table.snapshots().iterator());
+ // only keep the snapshots with added data files
+ snapshots =
+ snapshots.stream()
+ .filter(snapshot -> snapshot.addedDataFiles(table.io()).iterator().hasNext())
+ .collect(Collectors.toList());
+
+ // Sometimes we will have more checkpoints than the bounded source if we pass the
+ // auto checkpoint interval. Thus producing multiple snapshots.
+ assertThat(snapshots).hasSizeGreaterThanOrEqualTo(numOfCheckpoints);
+
+ // It takes 2 checkpoint cycle for statistics collection and application
+ // of the globally aggregated statistics in the range partitioner.
+ // The last two checkpoints should have range shuffle applied
+ List<Snapshot> rangePartitionedCycles =
+ snapshots.subList(snapshots.size() - 2, snapshots.size());
+
+ // since the input has a single value for the data column,
+ // it is always the same partition. Hence there is no difference
+ // for partitioned or not
+ for (Snapshot snapshot : rangePartitionedCycles) {
+ List<DataFile> addedDataFiles =
+ Lists.newArrayList(snapshot.addedDataFiles(table.io()).iterator());
+ // each writer task should only write one file for non-partition sort column
+ assertThat(addedDataFiles).hasSize(parallelism);
+ // verify there is no overlap in min-max stats range
+ if (parallelism == 2) {
+ assertIdColumnStatsNoRangeOverlap(addedDataFiles.get(0), addedDataFiles.get(1));
+ }
+ }
+ }
+
+ /** Test migration from Map stats to Sketch stats */
+ @TestTemplate
+ public void testRangeDistributionStatisticsMigration() throws Exception {
+ table
+ .updateProperties()
+ .set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName())
+ .commit();
+ table.replaceSortOrder().asc("id").commit();
+
+ int numOfCheckpoints = 4;
+ List<List<Row>> rowsPerCheckpoint = Lists.newArrayListWithCapacity(numOfCheckpoints);
+ for (int checkpointId = 0; checkpointId < numOfCheckpoints; ++checkpointId) {
+ // checkpointId 2 would emit 11_000 records which is larger than
+ // the OPERATOR_SKETCH_SWITCH_THRESHOLD of 10_000.
+ // This should trigger the stats migration.
+ int maxId = checkpointId < 1 ? 1_000 : 11_000;
+ List<Row> rows = Lists.newArrayListWithCapacity(maxId);
+ for (int j = 0; j < maxId; ++j) {
+ // fixed value "a" for the data (possible partition column)
+ rows.add(Row.of(j, "a"));
+ }
+
+ rowsPerCheckpoint.add(rows);
+ }
+
+ DataStream<Row> dataStream =
+ env.addSource(createRangeDistributionBoundedSource(rowsPerCheckpoint), ROW_TYPE_INFO);
+ FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+ .table(table)
+ .tableLoader(tableLoader)
+ .writeParallelism(parallelism)
+ .rangeDistributionStatisticsType(StatisticsType.Auto)
+ .append();
+ env.execute(getClass().getSimpleName());
+
+ table.refresh();
+ // ordered in reverse timeline from the newest snapshot to the oldest snapshot
+ List<Snapshot> snapshots = Lists.newArrayList(table.snapshots().iterator());
+ // only keep the snapshots with added data files
+ snapshots =
+ snapshots.stream()
+ .filter(snapshot -> snapshot.addedDataFiles(table.io()).iterator().hasNext())
+ .collect(Collectors.toList());
+
+ // Sometimes we will have more checkpoints than the bounded source if we pass the
+ // auto checkpoint interval. Thus producing multiple snapshots.
+ assertThat(snapshots).hasSizeGreaterThanOrEqualTo(numOfCheckpoints);
+
+ // It takes 2 checkpoint cycle for statistics collection and application
+ // of the globally aggregated statistics in the range partitioner.
+ // The last two checkpoints should have range shuffle applied
+ List<Snapshot> rangePartitionedCycles =
+ snapshots.subList(snapshots.size() - 2, snapshots.size());
+
+ // since the input has a single value for the data column,
+ // it is always the same partition. Hence there is no difference
+ // for partitioned or not
+ for (Snapshot snapshot : rangePartitionedCycles) {
+ List<DataFile> addedDataFiles =
+ Lists.newArrayList(snapshot.addedDataFiles(table.io()).iterator());
+ // each writer task should only write one file for non-partition sort column
+ // sometimes
+ assertThat(addedDataFiles).hasSize(parallelism);
+ // verify there is no overlap in min-max stats range
+ if (parallelism == 2) {
+ assertIdColumnStatsNoRangeOverlap(addedDataFiles.get(0), addedDataFiles.get(1));
+ }
+ }
+ }
+
+ private BoundedTestSource<Row> createRangeDistributionBoundedSource(
+ List<List<Row>> rowsPerCheckpoint) {
+ return new BoundedTestSource<>(rowsPerCheckpoint);
+ }
+
+ private List<List<Row>> createCharRows(int numOfCheckpoints, int countPerChar) {
+ List<List<Row>> rowsPerCheckpoint = Lists.newArrayListWithCapacity(numOfCheckpoints);
+ for (int checkpointId = 0; checkpointId < numOfCheckpoints; ++checkpointId) {
+ List<Row> rows = Lists.newArrayListWithCapacity(26 * countPerChar);
+ for (int j = 0; j < countPerChar; ++j) {
+ for (char c = 'a'; c <= 'z'; ++c) {
+ rows.add(Row.of(1, String.valueOf(c)));
+ }
+ }
+
+ rowsPerCheckpoint.add(rows);
+ }
+
+ return rowsPerCheckpoint;
+ }
+
+ private List<List<Row>> createIntRows(int numOfCheckpoints, int maxId) {
+ List<List<Row>> rowsPerCheckpoint = Lists.newArrayListWithCapacity(numOfCheckpoints);
+ for (int checkpointId = 0; checkpointId < numOfCheckpoints; ++checkpointId) {
+ List<Row> rows = Lists.newArrayListWithCapacity(maxId);
+ for (int j = 0; j < maxId; ++j) {
+ // fixed value "a" for the data (possible partition column)
+ rows.add(Row.of(j, "a"));
+ }
+
+ rowsPerCheckpoint.add(rows);
+ }
+
+ return rowsPerCheckpoint;
+ }
+
+ private void assertIdColumnStatsNoRangeOverlap(DataFile file1, DataFile file2) {
+ // id column has fieldId 1
+ int file1LowerBound =
+ Conversions.fromByteBuffer(Types.IntegerType.get(), file1.lowerBounds().get(1));
+ int file1UpperBound =
+ Conversions.fromByteBuffer(Types.IntegerType.get(), file1.upperBounds().get(1));
+ int file2LowerBound =
+ Conversions.fromByteBuffer(Types.IntegerType.get(), file2.lowerBounds().get(1));
+ int file2UpperBound =
+ Conversions.fromByteBuffer(Types.IntegerType.get(), file2.upperBounds().get(1));
+
+ if (file1LowerBound < file2LowerBound) {
+ assertThat(file1UpperBound).isLessThanOrEqualTo(file2LowerBound);
+ } else {
+ assertThat(file2UpperBound).isLessThanOrEqualTo(file1LowerBound);
+ }
+ }
}
diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
index 577c549..b283b83 100644
--- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
+++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
@@ -30,6 +30,7 @@
import org.apache.flink.types.Row;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.ParameterizedTestExtension;
@@ -184,11 +185,21 @@
.hasMessage(
"OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");
- assertThatThrownBy(
- () -> builder.equalityFieldColumns(ImmutableList.of()).overwrite(false).append())
- .isInstanceOf(IllegalStateException.class)
- .hasMessage(
- "Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
+ if (writeDistributionMode.equals(DistributionMode.RANGE.modeName()) && !partitioned) {
+ // validation error thrown from distributeDataStream
+ assertThatThrownBy(
+ () -> builder.equalityFieldColumns(ImmutableList.of()).overwrite(false).append())
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage(
+ "Invalid write distribution mode: range. Need to define sort order or partition spec.");
+ } else {
+ // validation error thrown from appendWriter
+ assertThatThrownBy(
+ () -> builder.equalityFieldColumns(ImmutableList.of()).overwrite(false).append())
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage(
+ "Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
+ }
}
@TestTemplate