Flink: backport PR #10859 for range distribution (#10990)

diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java
index 7167859..d5eea67 100644
--- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java
+++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java
@@ -53,6 +53,10 @@
     return new LongConfParser();
   }
 
+  public DoubleConfParser doubleConf() {
+    return new DoubleConfParser();
+  }
+
   public <E extends Enum<E>> EnumConfParser<E> enumConfParser(Class<E> enumClass) {
     return new EnumConfParser<>(enumClass);
   }
@@ -135,6 +139,29 @@
     }
   }
 
+  class DoubleConfParser extends ConfParser<DoubleConfParser, Double> {
+    private Double defaultValue;
+
+    @Override
+    protected DoubleConfParser self() {
+      return this;
+    }
+
+    public DoubleConfParser defaultValue(double value) {
+      this.defaultValue = value;
+      return self();
+    }
+
+    public double parse() {
+      Preconditions.checkArgument(defaultValue != null, "Default value cannot be null");
+      return parse(Double::parseDouble, defaultValue);
+    }
+
+    public Double parseOptional() {
+      return parse(Double::parseDouble, null);
+    }
+  }
+
   class StringConfParser extends ConfParser<StringConfParser, String> {
     private String defaultValue;
 
diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
index ca7b112..a31902d 100644
--- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
+++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
@@ -26,6 +26,7 @@
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.sink.shuffle.StatisticsType;
 
 /**
  * A class for common Iceberg configs for Flink writes.
@@ -167,6 +168,26 @@
     return DistributionMode.fromName(modeName);
   }
 
+  public StatisticsType rangeDistributionStatisticsType() {
+    String name =
+        confParser
+            .stringConf()
+            .option(FlinkWriteOptions.RANGE_DISTRIBUTION_STATISTICS_TYPE.key())
+            .flinkConfig(FlinkWriteOptions.RANGE_DISTRIBUTION_STATISTICS_TYPE)
+            .defaultValue(FlinkWriteOptions.RANGE_DISTRIBUTION_STATISTICS_TYPE.defaultValue())
+            .parse();
+    return StatisticsType.valueOf(name);
+  }
+
+  public double rangeDistributionSortKeyBaseWeight() {
+    return confParser
+        .doubleConf()
+        .option(FlinkWriteOptions.RANGE_DISTRIBUTION_SORT_KEY_BASE_WEIGHT.key())
+        .flinkConfig(FlinkWriteOptions.RANGE_DISTRIBUTION_SORT_KEY_BASE_WEIGHT)
+        .defaultValue(FlinkWriteOptions.RANGE_DISTRIBUTION_SORT_KEY_BASE_WEIGHT.defaultValue())
+        .parse();
+  }
+
   public int workerPoolSize() {
     return confParser
         .intConf()
diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
index df73f2e..c352867 100644
--- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
+++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
@@ -23,6 +23,7 @@
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.flink.sink.shuffle.StatisticsType;
 
 /** Flink sink write options */
 public class FlinkWriteOptions {
@@ -60,6 +61,19 @@
   public static final ConfigOption<String> DISTRIBUTION_MODE =
       ConfigOptions.key("distribution-mode").stringType().noDefaultValue();
 
+  public static final ConfigOption<String> RANGE_DISTRIBUTION_STATISTICS_TYPE =
+      ConfigOptions.key("range-distribution-statistics-type")
+          .stringType()
+          .defaultValue(StatisticsType.Auto.name())
+          .withDescription("Type of statistics collection: Auto, Map, Sketch");
+
+  public static final ConfigOption<Double> RANGE_DISTRIBUTION_SORT_KEY_BASE_WEIGHT =
+      ConfigOptions.key("range-distribution-sort-key-base-weight")
+          .doubleType()
+          .defaultValue(0.0d)
+          .withDescription(
+              "Base weight for every sort key relative to target weight per writer task");
+
   // Branch to write to
   public static final ConfigOption<String> BRANCH =
       ConfigOptions.key("branch").stringType().defaultValue(SnapshotRef.MAIN_BRANCH);
diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index 769af7d..2256d1e 100644
--- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -53,13 +53,19 @@
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Partitioning;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.FlinkWriteConf;
 import org.apache.iceberg.flink.FlinkWriteOptions;
 import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.sink.shuffle.DataStatisticsOperatorFactory;
+import org.apache.iceberg.flink.sink.shuffle.RangePartitioner;
+import org.apache.iceberg.flink.sink.shuffle.StatisticsOrRecord;
+import org.apache.iceberg.flink.sink.shuffle.StatisticsType;
 import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
 import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
@@ -233,9 +239,6 @@
      * @return {@link Builder} to connect the iceberg table.
      */
     public Builder distributionMode(DistributionMode mode) {
-      Preconditions.checkArgument(
-          !DistributionMode.RANGE.equals(mode),
-          "Flink does not support 'range' write distribution mode now.");
       if (mode != null) {
         writeOptions.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), mode.modeName());
       }
@@ -243,6 +246,62 @@
     }
 
     /**
+     * Range distribution needs to collect statistics about data distribution to properly shuffle
+     * the records in relatively balanced way. In general, low cardinality should use {@link
+     * StatisticsType#Map} and high cardinality should use {@link StatisticsType#Sketch} Refer to
+     * {@link StatisticsType} Javadoc for more details.
+     *
+     * <p>Default is {@link StatisticsType#Auto} where initially Map statistics is used. But if
+     * cardinality is higher than the threshold (currently 10K) as defined in {@code
+     * SketchUtil#OPERATOR_SKETCH_SWITCH_THRESHOLD}, statistics collection automatically switches to
+     * the sketch reservoir sampling.
+     *
+     * <p>Explicit set the statistics type if the default behavior doesn't work.
+     *
+     * @param type to specify the statistics type for range distribution.
+     * @return {@link Builder} to connect the iceberg table.
+     */
+    public Builder rangeDistributionStatisticsType(StatisticsType type) {
+      if (type != null) {
+        writeOptions.put(FlinkWriteOptions.RANGE_DISTRIBUTION_STATISTICS_TYPE.key(), type.name());
+      }
+      return this;
+    }
+
+    /**
+     * If sort order contains partition columns, each sort key would map to one partition and data
+     * file. This relative weight can avoid placing too many small files for sort keys with low
+     * traffic. It is a double value that defines the minimal weight for each sort key. `0.02` means
+     * each key has a base weight of `2%` of the targeted traffic weight per writer task.
+     *
+     * <p>E.g. the sink Iceberg table is partitioned daily by event time. Assume the data stream
+     * contains events from now up to 180 days ago. With event time, traffic weight distribution
+     * across different days typically has a long tail pattern. Current day contains the most
+     * traffic. The older days (long tail) contain less and less traffic. Assume writer parallelism
+     * is `10`. The total weight across all 180 days is `10,000`. Target traffic weight per writer
+     * task would be `1,000`. Assume the weight sum for the oldest 150 days is `1,000`. Normally,
+     * the range partitioner would put all the oldest 150 days in one writer task. That writer task
+     * would write to 150 small files (one per day). Keeping 150 open files can potentially consume
+     * large amount of memory. Flushing and uploading 150 files (however small) at checkpoint time
+     * can also be potentially slow. If this config is set to `0.02`. It means every sort key has a
+     * base weight of `2%` of targeted weight of `1,000` for every write task. It would essentially
+     * avoid placing more than `50` data files (one per day) on one writer task no matter how small
+     * they are.
+     *
+     * <p>This is only applicable to {@link StatisticsType#Map} for low-cardinality scenario. For
+     * {@link StatisticsType#Sketch} high-cardinality sort columns, they are usually not used as
+     * partition columns. Otherwise, too many partitions and small files may be generated during
+     * write. Sketch range partitioner simply splits high-cardinality keys into ordered ranges.
+     *
+     * <p>Default is {@code 0.0%}.
+     */
+    public Builder rangeDistributionSortKeyBaseWeight(double weight) {
+      writeOptions.put(
+          FlinkWriteOptions.RANGE_DISTRIBUTION_SORT_KEY_BASE_WEIGHT.key(), Double.toString(weight));
+      return this;
+    }
+
+    /**
      * Configuring the write parallel number for iceberg stream writer.
      *
      * @param newWriteParallelism the number of parallel iceberg stream writer.
@@ -349,18 +408,20 @@
       // Find out the equality field id list based on the user-provided equality field column names.
       List<Integer> equalityFieldIds = checkAndGetEqualityFieldIds();
 
-      // Convert the requested flink table schema to flink row type.
       RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
+      int writerParallelism =
+          flinkWriteConf.writeParallelism() == null
+              ? rowDataInput.getParallelism()
+              : flinkWriteConf.writeParallelism();
 
       // Distribute the records from input data stream based on the write.distribution-mode and
       // equality fields.
       DataStream<RowData> distributeStream =
-          distributeDataStream(
-              rowDataInput, equalityFieldIds, table.spec(), table.schema(), flinkRowType);
+          distributeDataStream(rowDataInput, equalityFieldIds, flinkRowType, writerParallelism);
 
       // Add parallel writers that append rows to files
       SingleOutputStreamOperator<WriteResult> writerStream =
-          appendWriter(distributeStream, flinkRowType, equalityFieldIds);
+          appendWriter(distributeStream, flinkRowType, equalityFieldIds, writerParallelism);
 
       // Add single-parallelism committer that commits files
       // after successful checkpoint or end of input
@@ -447,7 +508,10 @@
     }
 
     private SingleOutputStreamOperator<WriteResult> appendWriter(
-        DataStream<RowData> input, RowType flinkRowType, List<Integer> equalityFieldIds) {
+        DataStream<RowData> input,
+        RowType flinkRowType,
+        List<Integer> equalityFieldIds,
+        int writerParallelism) {
       // Validate the equality fields and partition fields if we enable the upsert mode.
       if (flinkWriteConf.upsertMode()) {
         Preconditions.checkState(
@@ -481,17 +545,13 @@
       IcebergStreamWriter<RowData> streamWriter =
           createStreamWriter(tableSupplier, flinkWriteConf, flinkRowType, equalityFieldIds);
 
-      int parallelism =
-          flinkWriteConf.writeParallelism() == null
-              ? input.getParallelism()
-              : flinkWriteConf.writeParallelism();
       SingleOutputStreamOperator<WriteResult> writerStream =
           input
               .transform(
                   operatorName(ICEBERG_STREAM_WRITER_NAME),
                   TypeInformation.of(WriteResult.class),
                   streamWriter)
-              .setParallelism(parallelism);
+              .setParallelism(writerParallelism);
       if (uidPrefix != null) {
         writerStream = writerStream.uid(uidPrefix + "-writer");
       }
@@ -501,12 +561,15 @@
     private DataStream<RowData> distributeDataStream(
         DataStream<RowData> input,
         List<Integer> equalityFieldIds,
-        PartitionSpec partitionSpec,
-        Schema iSchema,
-        RowType flinkRowType) {
+        RowType flinkRowType,
+        int writerParallelism) {
       DistributionMode writeMode = flinkWriteConf.distributionMode();
-
       LOG.info("Write distribution mode is '{}'", writeMode.modeName());
+
+      Schema iSchema = table.schema();
+      PartitionSpec partitionSpec = table.spec();
+      SortOrder sortOrder = table.sortOrder();
+
       switch (writeMode) {
         case NONE:
           if (equalityFieldIds.isEmpty()) {
@@ -548,21 +611,52 @@
           }
 
         case RANGE:
-          if (equalityFieldIds.isEmpty()) {
+          // Ideally, exception should be thrown in the combination of range distribution and
+          // equality fields. Primary key case should use hash distribution mode.
+          // Keep the current behavior of falling back to keyBy for backward compatibility.
+          if (!equalityFieldIds.isEmpty()) {
             LOG.warn(
-                "Fallback to use 'none' distribution mode, because there are no equality fields set "
-                    + "and {}=range is not supported yet in flink",
-                WRITE_DISTRIBUTION_MODE);
-            return input;
-          } else {
-            LOG.info(
-                "Distribute rows by equality fields, because there are equality fields set "
-                    + "and{}=range is not supported yet in flink",
+                "Hash distribute rows by equality fields, even though {}=range is set. "
+                    + "Range distribution for primary keys are not always safe in "
+                    + "Flink streaming writer.",
                 WRITE_DISTRIBUTION_MODE);
             return input.keyBy(
                 new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds));
           }
 
+          // range distribute by partition key or sort key if table has an SortOrder
+          Preconditions.checkState(
+              sortOrder.isSorted() || partitionSpec.isPartitioned(),
+              "Invalid write distribution mode: range. Need to define sort order or partition spec.");
+          if (sortOrder.isUnsorted()) {
+            sortOrder = Partitioning.sortOrderFor(partitionSpec);
+            LOG.info("Construct sort order from partition spec");
+          }
+
+          LOG.info("Range distribute rows by sort order: {}", sortOrder);
+          StatisticsType statisticsType = flinkWriteConf.rangeDistributionStatisticsType();
+          SingleOutputStreamOperator<StatisticsOrRecord> shuffleStream =
+              input
+                  .transform(
+                      operatorName("range-shuffle"),
+                      TypeInformation.of(StatisticsOrRecord.class),
+                      new DataStatisticsOperatorFactory(
+                          iSchema,
+                          sortOrder,
+                          writerParallelism,
+                          statisticsType,
+                          flinkWriteConf.rangeDistributionSortKeyBaseWeight()))
+                  // Set the parallelism same as input operator to encourage chaining
+                  .setParallelism(input.getParallelism());
+          if (uidPrefix != null) {
+            shuffleStream = shuffleStream.uid(uidPrefix + "-shuffle");
+          }
+
+          return shuffleStream
+              .partitionCustom(new RangePartitioner(iSchema, sortOrder), r -> r)
+              .filter(StatisticsOrRecord::hasRecord)
+              .map(StatisticsOrRecord::record);
+
         default:
           throw new RuntimeException("Unrecognized " + WRITE_DISTRIBUTION_MODE + ": " + writeMode);
       }
@@ -577,12 +671,9 @@
       TypeUtil.validateWriteSchema(schema, writeSchema, true, true);
 
       // We use this flink schema to read values from RowData. The flink's TINYINT and SMALLINT will
-      // be promoted to
-      // iceberg INTEGER, that means if we use iceberg's table schema to read TINYINT (backend by 1
-      // 'byte'), we will
-      // read 4 bytes rather than 1 byte, it will mess up the byte array in BinaryRowData. So here
-      // we must use flink
-      // schema.
+      // be promoted to iceberg INTEGER, that means if we use iceberg's table schema to read TINYINT
+      // (backend by 1 'byte'), we will read 4 bytes rather than 1 byte, it will mess up the
+      // byte array in BinaryRowData. So here we must use flink schema.
       return (RowType) requestedSchema.toRowDataType().getLogicalType();
     } else {
       return FlinkSchemaUtil.convert(schema);
diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperatorFactory.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperatorFactory.java
new file mode 100644
index 0000000..dc147bf
--- /dev/null
+++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperatorFactory.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
+
+@Internal
+public class DataStatisticsOperatorFactory extends AbstractStreamOperatorFactory<StatisticsOrRecord>
+    implements CoordinatedOperatorFactory<StatisticsOrRecord>,
+        OneInputStreamOperatorFactory<RowData, StatisticsOrRecord> {
+
+  private final Schema schema;
+  private final SortOrder sortOrder;
+  private final int downstreamParallelism;
+  private final StatisticsType type;
+  private final double closeFileCostWeightPercentage;
+
+  public DataStatisticsOperatorFactory(
+      Schema schema,
+      SortOrder sortOrder,
+      int downstreamParallelism,
+      StatisticsType type,
+      double closeFileCostWeightPercentage) {
+    this.schema = schema;
+    this.sortOrder = sortOrder;
+    this.downstreamParallelism = downstreamParallelism;
+    this.type = type;
+    this.closeFileCostWeightPercentage = closeFileCostWeightPercentage;
+  }
+
+  @Override
+  public OperatorCoordinator.Provider getCoordinatorProvider(
+      String operatorName, OperatorID operatorID) {
+    return new DataStatisticsCoordinatorProvider(
+        operatorName,
+        operatorID,
+        schema,
+        sortOrder,
+        downstreamParallelism,
+        type,
+        closeFileCostWeightPercentage);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public <T extends StreamOperator<StatisticsOrRecord>> T createStreamOperator(
+      StreamOperatorParameters<StatisticsOrRecord> parameters) {
+    OperatorID operatorId = parameters.getStreamConfig().getOperatorID();
+    String operatorName = parameters.getStreamConfig().getOperatorName();
+    OperatorEventGateway gateway =
+        parameters.getOperatorEventDispatcher().getOperatorEventGateway(operatorId);
+
+    DataStatisticsOperator rangeStatisticsOperator =
+        new DataStatisticsOperator(
+            operatorName, schema, sortOrder, gateway, downstreamParallelism, type);
+
+    rangeStatisticsOperator.setup(
+        parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
+    parameters
+        .getOperatorEventDispatcher()
+        .registerEventHandler(operatorId, rangeStatisticsOperator);
+
+    return (T) rangeStatisticsOperator;
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
+    return DataStatisticsOperator.class;
+  }
+}
diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java
index 75e397d..df8c3c7 100644
--- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java
+++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java
@@ -20,28 +20,37 @@
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.types.Row;
+import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Parameter;
 import org.apache.iceberg.ParameterizedTestExtension;
 import org.apache.iceberg.Parameters;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.flink.FlinkWriteOptions;
 import org.apache.iceberg.flink.HadoopCatalogExtension;
 import org.apache.iceberg.flink.MiniFlinkClusterExtension;
 import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.sink.shuffle.StatisticsType;
+import org.apache.iceberg.flink.source.BoundedTestSource;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Types;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -177,4 +186,309 @@
         .isInstanceOf(IllegalArgumentException.class)
         .hasMessage("Invalid distribution mode: UNRECOGNIZED");
   }
+
+  @TestTemplate
+  public void testRangeDistributionWithoutSortOrderUnpartitioned() throws Exception {
+    assumeThat(partitioned).isFalse();
+
+    table
+        .updateProperties()
+        .set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName())
+        .commit();
+
+    int numOfCheckpoints = 6;
+    DataStream<Row> dataStream =
+        env.addSource(
+            createRangeDistributionBoundedSource(createCharRows(numOfCheckpoints, 10)),
+            ROW_TYPE_INFO);
+    FlinkSink.Builder builder =
+        FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+            .table(table)
+            .tableLoader(tableLoader)
+            .writeParallelism(parallelism);
+
+    // Range distribution requires either sort order or partition spec defined
+    assertThatThrownBy(builder::append)
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage(
+            "Invalid write distribution mode: range. Need to define sort order or partition spec.");
+  }
+
+  @TestTemplate
+  public void testRangeDistributionWithoutSortOrderPartitioned() throws Exception {
+    assumeThat(partitioned).isTrue();
+
+    table
+        .updateProperties()
+        .set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName())
+        .commit();
+
+    int numOfCheckpoints = 6;
+    DataStream<Row> dataStream =
+        env.addSource(
+            createRangeDistributionBoundedSource(createCharRows(numOfCheckpoints, 10)),
+            ROW_TYPE_INFO);
+    FlinkSink.Builder builder =
+        FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+            .table(table)
+            .tableLoader(tableLoader)
+            .writeParallelism(parallelism);
+
+    // sort based on partition columns
+    builder.append();
+    env.execute(getClass().getSimpleName());
+
+    table.refresh();
+    // ordered in reverse timeline from the newest snapshot to the oldest snapshot
+    List<Snapshot> snapshots = Lists.newArrayList(table.snapshots().iterator());
+    // only keep the snapshots with added data files
+    snapshots =
+        snapshots.stream()
+            .filter(snapshot -> snapshot.addedDataFiles(table.io()).iterator().hasNext())
+            .collect(Collectors.toList());
+
+    // Sometimes we will have more checkpoints than the bounded source if we pass the
+    // auto checkpoint interval. Thus producing multiple snapshots.
+    assertThat(snapshots).hasSizeGreaterThanOrEqualTo(numOfCheckpoints);
+  }
+
+  @TestTemplate
+  public void testRangeDistributionWithSortOrder() throws Exception {
+    table
+        .updateProperties()
+        .set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName())
+        .commit();
+    table.replaceSortOrder().asc("data").commit();
+
+    int numOfCheckpoints = 6;
+    DataStream<Row> dataStream =
+        env.addSource(
+            createRangeDistributionBoundedSource(createCharRows(numOfCheckpoints, 10)),
+            ROW_TYPE_INFO);
+    FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+        .table(table)
+        .tableLoader(tableLoader)
+        .writeParallelism(parallelism)
+        .rangeDistributionStatisticsType(StatisticsType.Map)
+        .append();
+    env.execute(getClass().getSimpleName());
+
+    table.refresh();
+    // ordered in reverse timeline from the newest snapshot to the oldest snapshot
+    List<Snapshot> snapshots = Lists.newArrayList(table.snapshots().iterator());
+    // only keep the snapshots with added data files
+    snapshots =
+        snapshots.stream()
+            .filter(snapshot -> snapshot.addedDataFiles(table.io()).iterator().hasNext())
+            .collect(Collectors.toList());
+
+    // Sometimes we will have more checkpoints than the bounded source if we pass the
+    // auto checkpoint interval. Thus producing multiple snapshots.
+    assertThat(snapshots).hasSizeGreaterThanOrEqualTo(numOfCheckpoints);
+
+    // It takes 2 checkpoint cycle for statistics collection and application
+    // of the globally aggregated statistics in the range partitioner.
+    // The last two checkpoints should have range shuffle applied
+    List<Snapshot> rangePartitionedCycles =
+        snapshots.subList(snapshots.size() - 2, snapshots.size());
+
+    if (partitioned) {
+      for (Snapshot snapshot : rangePartitionedCycles) {
+        List<DataFile> addedDataFiles =
+            Lists.newArrayList(snapshot.addedDataFiles(table.io()).iterator());
+        // up to 26 partitions
+        assertThat(addedDataFiles).hasSizeLessThanOrEqualTo(26);
+      }
+    } else {
+      for (Snapshot snapshot : rangePartitionedCycles) {
+        List<DataFile> addedDataFiles =
+            Lists.newArrayList(snapshot.addedDataFiles(table.io()).iterator());
+        // each writer task should only write one file for non-partition sort column
+        assertThat(addedDataFiles).hasSize(parallelism);
+        // verify there is no overlap in min-max stats range
+        if (parallelism == 2) {
+          assertIdColumnStatsNoRangeOverlap(addedDataFiles.get(0), addedDataFiles.get(1));
+        }
+      }
+    }
+  }
+
+  @TestTemplate
+  public void testRangeDistributionSketchWithSortOrder() throws Exception {
+    table
+        .updateProperties()
+        .set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName())
+        .commit();
+    table.replaceSortOrder().asc("id").commit();
+
+    int numOfCheckpoints = 6;
+    DataStream<Row> dataStream =
+        env.addSource(
+            createRangeDistributionBoundedSource(createIntRows(numOfCheckpoints, 1_000)),
+            ROW_TYPE_INFO);
+    FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+        .table(table)
+        .tableLoader(tableLoader)
+        .writeParallelism(parallelism)
+        .rangeDistributionStatisticsType(StatisticsType.Sketch)
+        .append();
+    env.execute(getClass().getSimpleName());
+
+    table.refresh();
+    // ordered in reverse timeline from the newest snapshot to the oldest snapshot
+    List<Snapshot> snapshots = Lists.newArrayList(table.snapshots().iterator());
+    // only keep the snapshots with added data files
+    snapshots =
+        snapshots.stream()
+            .filter(snapshot -> snapshot.addedDataFiles(table.io()).iterator().hasNext())
+            .collect(Collectors.toList());
+
+    // Sometimes we will have more checkpoints than the bounded source if we pass the
+    // auto checkpoint interval. Thus producing multiple snapshots.
+    assertThat(snapshots).hasSizeGreaterThanOrEqualTo(numOfCheckpoints);
+
+    // It takes 2 checkpoint cycle for statistics collection and application
+    // of the globally aggregated statistics in the range partitioner.
+    // The last two checkpoints should have range shuffle applied
+    List<Snapshot> rangePartitionedCycles =
+        snapshots.subList(snapshots.size() - 2, snapshots.size());
+
+    // since the input has a single value for the data column,
+    // it is always the same partition. Hence there is no difference
+    // for partitioned or not
+    for (Snapshot snapshot : rangePartitionedCycles) {
+      List<DataFile> addedDataFiles =
+          Lists.newArrayList(snapshot.addedDataFiles(table.io()).iterator());
+      // each writer task should only write one file for non-partition sort column
+      assertThat(addedDataFiles).hasSize(parallelism);
+      // verify there is no overlap in min-max stats range
+      if (parallelism == 2) {
+        assertIdColumnStatsNoRangeOverlap(addedDataFiles.get(0), addedDataFiles.get(1));
+      }
+    }
+  }
+
+  /** Test migration from Map stats to Sketch stats */
+  @TestTemplate
+  public void testRangeDistributionStatisticsMigration() throws Exception {
+    table
+        .updateProperties()
+        .set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName())
+        .commit();
+    table.replaceSortOrder().asc("id").commit();
+
+    int numOfCheckpoints = 4;
+    List<List<Row>> rowsPerCheckpoint = Lists.newArrayListWithCapacity(numOfCheckpoints);
+    for (int checkpointId = 0; checkpointId < numOfCheckpoints; ++checkpointId) {
+      // checkpointId 2 would emit 11_000 records which is larger than
+      // the OPERATOR_SKETCH_SWITCH_THRESHOLD of 10_000.
+      // This should trigger the stats migration.
+      int maxId = checkpointId < 1 ? 1_000 : 11_000;
+      List<Row> rows = Lists.newArrayListWithCapacity(maxId);
+      for (int j = 0; j < maxId; ++j) {
+        // fixed value "a" for the data (possible partition column)
+        rows.add(Row.of(j, "a"));
+      }
+
+      rowsPerCheckpoint.add(rows);
+    }
+
+    DataStream<Row> dataStream =
+        env.addSource(createRangeDistributionBoundedSource(rowsPerCheckpoint), ROW_TYPE_INFO);
+    FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+        .table(table)
+        .tableLoader(tableLoader)
+        .writeParallelism(parallelism)
+        .rangeDistributionStatisticsType(StatisticsType.Auto)
+        .append();
+    env.execute(getClass().getSimpleName());
+
+    table.refresh();
+    // ordered in reverse timeline from the newest snapshot to the oldest snapshot
+    List<Snapshot> snapshots = Lists.newArrayList(table.snapshots().iterator());
+    // only keep the snapshots with added data files
+    snapshots =
+        snapshots.stream()
+            .filter(snapshot -> snapshot.addedDataFiles(table.io()).iterator().hasNext())
+            .collect(Collectors.toList());
+
+    // Sometimes we will have more checkpoints than the bounded source if we pass the
+    // auto checkpoint interval. Thus producing multiple snapshots.
+    assertThat(snapshots).hasSizeGreaterThanOrEqualTo(numOfCheckpoints);
+
+    // It takes 2 checkpoint cycle for statistics collection and application
+    // of the globally aggregated statistics in the range partitioner.
+    // The last two checkpoints should have range shuffle applied
+    List<Snapshot> rangePartitionedCycles =
+        snapshots.subList(snapshots.size() - 2, snapshots.size());
+
+    // since the input has a single value for the data column,
+    // it is always the same partition. Hence there is no difference
+    // for partitioned or not
+    for (Snapshot snapshot : rangePartitionedCycles) {
+      List<DataFile> addedDataFiles =
+          Lists.newArrayList(snapshot.addedDataFiles(table.io()).iterator());
+      // each writer task should only write one file for non-partition sort column
+      // sometimes
+      assertThat(addedDataFiles).hasSize(parallelism);
+      // verify there is no overlap in min-max stats range
+      if (parallelism == 2) {
+        assertIdColumnStatsNoRangeOverlap(addedDataFiles.get(0), addedDataFiles.get(1));
+      }
+    }
+  }
+
+  private BoundedTestSource<Row> createRangeDistributionBoundedSource(
+      List<List<Row>> rowsPerCheckpoint) {
+    return new BoundedTestSource<>(rowsPerCheckpoint);
+  }
+
+  private List<List<Row>> createCharRows(int numOfCheckpoints, int countPerChar) {
+    List<List<Row>> rowsPerCheckpoint = Lists.newArrayListWithCapacity(numOfCheckpoints);
+    for (int checkpointId = 0; checkpointId < numOfCheckpoints; ++checkpointId) {
+      List<Row> rows = Lists.newArrayListWithCapacity(26 * countPerChar);
+      for (int j = 0; j < countPerChar; ++j) {
+        for (char c = 'a'; c <= 'z'; ++c) {
+          rows.add(Row.of(1, String.valueOf(c)));
+        }
+      }
+
+      rowsPerCheckpoint.add(rows);
+    }
+
+    return rowsPerCheckpoint;
+  }
+
+  private List<List<Row>> createIntRows(int numOfCheckpoints, int maxId) {
+    List<List<Row>> rowsPerCheckpoint = Lists.newArrayListWithCapacity(numOfCheckpoints);
+    for (int checkpointId = 0; checkpointId < numOfCheckpoints; ++checkpointId) {
+      List<Row> rows = Lists.newArrayListWithCapacity(maxId);
+      for (int j = 0; j < maxId; ++j) {
+        // fixed value "a" for the data (possible partition column)
+        rows.add(Row.of(j, "a"));
+      }
+
+      rowsPerCheckpoint.add(rows);
+    }
+
+    return rowsPerCheckpoint;
+  }
+
+  private void assertIdColumnStatsNoRangeOverlap(DataFile file1, DataFile file2) {
+    // id column has fieldId 1
+    int file1LowerBound =
+        Conversions.fromByteBuffer(Types.IntegerType.get(), file1.lowerBounds().get(1));
+    int file1UpperBound =
+        Conversions.fromByteBuffer(Types.IntegerType.get(), file1.upperBounds().get(1));
+    int file2LowerBound =
+        Conversions.fromByteBuffer(Types.IntegerType.get(), file2.lowerBounds().get(1));
+    int file2UpperBound =
+        Conversions.fromByteBuffer(Types.IntegerType.get(), file2.upperBounds().get(1));
+
+    if (file1LowerBound < file2LowerBound) {
+      assertThat(file1UpperBound).isLessThanOrEqualTo(file2LowerBound);
+    } else {
+      assertThat(file2UpperBound).isLessThanOrEqualTo(file1LowerBound);
+    }
+  }
 }
diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
index 577c549..b283b83 100644
--- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
+++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
@@ -30,6 +30,7 @@
 import org.apache.flink.types.Row;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.ParameterizedTestExtension;
@@ -184,11 +185,21 @@
         .hasMessage(
             "OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");
 
-    assertThatThrownBy(
-            () -> builder.equalityFieldColumns(ImmutableList.of()).overwrite(false).append())
-        .isInstanceOf(IllegalStateException.class)
-        .hasMessage(
-            "Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
+    if (writeDistributionMode.equals(DistributionMode.RANGE.modeName()) && !partitioned) {
+      // validation error thrown from distributeDataStream
+      assertThatThrownBy(
+              () -> builder.equalityFieldColumns(ImmutableList.of()).overwrite(false).append())
+          .isInstanceOf(IllegalStateException.class)
+          .hasMessage(
+              "Invalid write distribution mode: range. Need to define sort order or partition spec.");
+    } else {
+      // validation error thrown from appendWriter
+      assertThatThrownBy(
+              () -> builder.equalityFieldColumns(ImmutableList.of()).overwrite(false).append())
+          .isInstanceOf(IllegalStateException.class)
+          .hasMessage(
+              "Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
+    }
   }
 
   @TestTemplate
diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java
index 7167859..d5eea67 100644
--- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java
+++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java
@@ -53,6 +53,10 @@
     return new LongConfParser();
   }
 
+  public DoubleConfParser doubleConf() {
+    return new DoubleConfParser();
+  }
+
   public <E extends Enum<E>> EnumConfParser<E> enumConfParser(Class<E> enumClass) {
     return new EnumConfParser<>(enumClass);
   }
@@ -135,6 +139,29 @@
     }
   }
 
+  class DoubleConfParser extends ConfParser<DoubleConfParser, Double> {
+    private Double defaultValue;
+
+    @Override
+    protected DoubleConfParser self() {
+      return this;
+    }
+
+    public DoubleConfParser defaultValue(double value) {
+      this.defaultValue = value;
+      return self();
+    }
+
+    public double parse() {
+      Preconditions.checkArgument(defaultValue != null, "Default value cannot be null");
+      return parse(Double::parseDouble, defaultValue);
+    }
+
+    public Double parseOptional() {
+      return parse(Double::parseDouble, null);
+    }
+  }
+
   class StringConfParser extends ConfParser<StringConfParser, String> {
     private String defaultValue;
 
diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
index ca7b112..a31902d 100644
--- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
+++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
@@ -26,6 +26,7 @@
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.sink.shuffle.StatisticsType;
 
 /**
  * A class for common Iceberg configs for Flink writes.
@@ -167,6 +168,26 @@
     return DistributionMode.fromName(modeName);
   }
 
+  public StatisticsType rangeDistributionStatisticsType() {
+    String name =
+        confParser
+            .stringConf()
+            .option(FlinkWriteOptions.RANGE_DISTRIBUTION_STATISTICS_TYPE.key())
+            .flinkConfig(FlinkWriteOptions.RANGE_DISTRIBUTION_STATISTICS_TYPE)
+            .defaultValue(FlinkWriteOptions.RANGE_DISTRIBUTION_STATISTICS_TYPE.defaultValue())
+            .parse();
+    return StatisticsType.valueOf(name);
+  }
+
+  public double rangeDistributionSortKeyBaseWeight() {
+    return confParser
+        .doubleConf()
+        .option(FlinkWriteOptions.RANGE_DISTRIBUTION_SORT_KEY_BASE_WEIGHT.key())
+        .flinkConfig(FlinkWriteOptions.RANGE_DISTRIBUTION_SORT_KEY_BASE_WEIGHT)
+        .defaultValue(FlinkWriteOptions.RANGE_DISTRIBUTION_SORT_KEY_BASE_WEIGHT.defaultValue())
+        .parse();
+  }
+
   public int workerPoolSize() {
     return confParser
         .intConf()
diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
index df73f2e..c352867 100644
--- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
+++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
@@ -23,6 +23,7 @@
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.flink.sink.shuffle.StatisticsType;
 
 /** Flink sink write options */
 public class FlinkWriteOptions {
@@ -60,6 +61,19 @@
   public static final ConfigOption<String> DISTRIBUTION_MODE =
       ConfigOptions.key("distribution-mode").stringType().noDefaultValue();
 
+  public static final ConfigOption<String> RANGE_DISTRIBUTION_STATISTICS_TYPE =
+      ConfigOptions.key("range-distribution-statistics-type")
+          .stringType()
+          .defaultValue(StatisticsType.Auto.name())
+          .withDescription("Type of statistics collection: Auto, Map, Sketch");
+
+  public static final ConfigOption<Double> RANGE_DISTRIBUTION_SORT_KEY_BASE_WEIGHT =
+      ConfigOptions.key("range-distribution-sort-key-base-weight")
+          .doubleType()
+          .defaultValue(0.0d)
+          .withDescription(
+              "Base weight for every sort key relative to target weight per writer task");
+
   // Branch to write to
   public static final ConfigOption<String> BRANCH =
       ConfigOptions.key("branch").stringType().defaultValue(SnapshotRef.MAIN_BRANCH);
diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index 769af7d..2256d1e 100644
--- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -53,13 +53,19 @@
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Partitioning;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.FlinkWriteConf;
 import org.apache.iceberg.flink.FlinkWriteOptions;
 import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.sink.shuffle.DataStatisticsOperatorFactory;
+import org.apache.iceberg.flink.sink.shuffle.RangePartitioner;
+import org.apache.iceberg.flink.sink.shuffle.StatisticsOrRecord;
+import org.apache.iceberg.flink.sink.shuffle.StatisticsType;
 import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
 import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
@@ -233,9 +239,6 @@
      * @return {@link Builder} to connect the iceberg table.
      */
     public Builder distributionMode(DistributionMode mode) {
-      Preconditions.checkArgument(
-          !DistributionMode.RANGE.equals(mode),
-          "Flink does not support 'range' write distribution mode now.");
       if (mode != null) {
         writeOptions.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), mode.modeName());
       }
@@ -243,6 +246,62 @@
     }
 
     /**
+     * Range distribution needs to collect statistics about data distribution to properly shuffle
+     * the records in relatively balanced way. In general, low cardinality should use {@link
+     * StatisticsType#Map} and high cardinality should use {@link StatisticsType#Sketch} Refer to
+     * {@link StatisticsType} Javadoc for more details.
+     *
+     * <p>Default is {@link StatisticsType#Auto} where initially Map statistics is used. But if
+     * cardinality is higher than the threshold (currently 10K) as defined in {@code
+     * SketchUtil#OPERATOR_SKETCH_SWITCH_THRESHOLD}, statistics collection automatically switches to
+     * the sketch reservoir sampling.
+     *
+     * <p>Explicit set the statistics type if the default behavior doesn't work.
+     *
+     * @param type to specify the statistics type for range distribution.
+     * @return {@link Builder} to connect the iceberg table.
+     */
+    public Builder rangeDistributionStatisticsType(StatisticsType type) {
+      if (type != null) {
+        writeOptions.put(FlinkWriteOptions.RANGE_DISTRIBUTION_STATISTICS_TYPE.key(), type.name());
+      }
+      return this;
+    }
+
+    /**
+     * If sort order contains partition columns, each sort key would map to one partition and data
+     * file. This relative weight can avoid placing too many small files for sort keys with low
+     * traffic. It is a double value that defines the minimal weight for each sort key. `0.02` means
+     * each key has a base weight of `2%` of the targeted traffic weight per writer task.
+     *
+     * <p>E.g. the sink Iceberg table is partitioned daily by event time. Assume the data stream
+     * contains events from now up to 180 days ago. With event time, traffic weight distribution
+     * across different days typically has a long tail pattern. Current day contains the most
+     * traffic. The older days (long tail) contain less and less traffic. Assume writer parallelism
+     * is `10`. The total weight across all 180 days is `10,000`. Target traffic weight per writer
+     * task would be `1,000`. Assume the weight sum for the oldest 150 days is `1,000`. Normally,
+     * the range partitioner would put all the oldest 150 days in one writer task. That writer task
+     * would write to 150 small files (one per day). Keeping 150 open files can potentially consume
+     * large amount of memory. Flushing and uploading 150 files (however small) at checkpoint time
+     * can also be potentially slow. If this config is set to `0.02`. It means every sort key has a
+     * base weight of `2%` of targeted weight of `1,000` for every write task. It would essentially
+     * avoid placing more than `50` data files (one per day) on one writer task no matter how small
+     * they are.
+     *
+     * <p>This is only applicable to {@link StatisticsType#Map} for low-cardinality scenario. For
+     * {@link StatisticsType#Sketch} high-cardinality sort columns, they are usually not used as
+     * partition columns. Otherwise, too many partitions and small files may be generated during
+     * write. Sketch range partitioner simply splits high-cardinality keys into ordered ranges.
+     *
+     * <p>Default is {@code 0.0%}.
+     */
+    public Builder rangeDistributionSortKeyBaseWeight(double weight) {
+      writeOptions.put(
+          FlinkWriteOptions.RANGE_DISTRIBUTION_SORT_KEY_BASE_WEIGHT.key(), Double.toString(weight));
+      return this;
+    }
+
+    /**
      * Configuring the write parallel number for iceberg stream writer.
      *
      * @param newWriteParallelism the number of parallel iceberg stream writer.
@@ -349,18 +408,20 @@
       // Find out the equality field id list based on the user-provided equality field column names.
       List<Integer> equalityFieldIds = checkAndGetEqualityFieldIds();
 
-      // Convert the requested flink table schema to flink row type.
       RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
+      int writerParallelism =
+          flinkWriteConf.writeParallelism() == null
+              ? rowDataInput.getParallelism()
+              : flinkWriteConf.writeParallelism();
 
       // Distribute the records from input data stream based on the write.distribution-mode and
       // equality fields.
       DataStream<RowData> distributeStream =
-          distributeDataStream(
-              rowDataInput, equalityFieldIds, table.spec(), table.schema(), flinkRowType);
+          distributeDataStream(rowDataInput, equalityFieldIds, flinkRowType, writerParallelism);
 
       // Add parallel writers that append rows to files
       SingleOutputStreamOperator<WriteResult> writerStream =
-          appendWriter(distributeStream, flinkRowType, equalityFieldIds);
+          appendWriter(distributeStream, flinkRowType, equalityFieldIds, writerParallelism);
 
       // Add single-parallelism committer that commits files
       // after successful checkpoint or end of input
@@ -447,7 +508,10 @@
     }
 
     private SingleOutputStreamOperator<WriteResult> appendWriter(
-        DataStream<RowData> input, RowType flinkRowType, List<Integer> equalityFieldIds) {
+        DataStream<RowData> input,
+        RowType flinkRowType,
+        List<Integer> equalityFieldIds,
+        int writerParallelism) {
       // Validate the equality fields and partition fields if we enable the upsert mode.
       if (flinkWriteConf.upsertMode()) {
         Preconditions.checkState(
@@ -481,17 +545,13 @@
       IcebergStreamWriter<RowData> streamWriter =
           createStreamWriter(tableSupplier, flinkWriteConf, flinkRowType, equalityFieldIds);
 
-      int parallelism =
-          flinkWriteConf.writeParallelism() == null
-              ? input.getParallelism()
-              : flinkWriteConf.writeParallelism();
       SingleOutputStreamOperator<WriteResult> writerStream =
           input
               .transform(
                   operatorName(ICEBERG_STREAM_WRITER_NAME),
                   TypeInformation.of(WriteResult.class),
                   streamWriter)
-              .setParallelism(parallelism);
+              .setParallelism(writerParallelism);
       if (uidPrefix != null) {
         writerStream = writerStream.uid(uidPrefix + "-writer");
       }
@@ -501,12 +561,15 @@
     private DataStream<RowData> distributeDataStream(
         DataStream<RowData> input,
         List<Integer> equalityFieldIds,
-        PartitionSpec partitionSpec,
-        Schema iSchema,
-        RowType flinkRowType) {
+        RowType flinkRowType,
+        int writerParallelism) {
       DistributionMode writeMode = flinkWriteConf.distributionMode();
-
       LOG.info("Write distribution mode is '{}'", writeMode.modeName());
+
+      Schema iSchema = table.schema();
+      PartitionSpec partitionSpec = table.spec();
+      SortOrder sortOrder = table.sortOrder();
+
       switch (writeMode) {
         case NONE:
           if (equalityFieldIds.isEmpty()) {
@@ -548,21 +611,52 @@
           }
 
         case RANGE:
-          if (equalityFieldIds.isEmpty()) {
+          // Ideally, exception should be thrown in the combination of range distribution and
+          // equality fields. Primary key case should use hash distribution mode.
+          // Keep the current behavior of falling back to keyBy for backward compatibility.
+          if (!equalityFieldIds.isEmpty()) {
             LOG.warn(
-                "Fallback to use 'none' distribution mode, because there are no equality fields set "
-                    + "and {}=range is not supported yet in flink",
-                WRITE_DISTRIBUTION_MODE);
-            return input;
-          } else {
-            LOG.info(
-                "Distribute rows by equality fields, because there are equality fields set "
-                    + "and{}=range is not supported yet in flink",
+                "Hash distribute rows by equality fields, even though {}=range is set. "
+                    + "Range distribution for primary keys are not always safe in "
+                    + "Flink streaming writer.",
                 WRITE_DISTRIBUTION_MODE);
             return input.keyBy(
                 new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds));
           }
 
+          // range distribute by partition key or sort key if table has an SortOrder
+          Preconditions.checkState(
+              sortOrder.isSorted() || partitionSpec.isPartitioned(),
+              "Invalid write distribution mode: range. Need to define sort order or partition spec.");
+          if (sortOrder.isUnsorted()) {
+            sortOrder = Partitioning.sortOrderFor(partitionSpec);
+            LOG.info("Construct sort order from partition spec");
+          }
+
+          LOG.info("Range distribute rows by sort order: {}", sortOrder);
+          StatisticsType statisticsType = flinkWriteConf.rangeDistributionStatisticsType();
+          SingleOutputStreamOperator<StatisticsOrRecord> shuffleStream =
+              input
+                  .transform(
+                      operatorName("range-shuffle"),
+                      TypeInformation.of(StatisticsOrRecord.class),
+                      new DataStatisticsOperatorFactory(
+                          iSchema,
+                          sortOrder,
+                          writerParallelism,
+                          statisticsType,
+                          flinkWriteConf.rangeDistributionSortKeyBaseWeight()))
+                  // Set the parallelism same as input operator to encourage chaining
+                  .setParallelism(input.getParallelism());
+          if (uidPrefix != null) {
+            shuffleStream = shuffleStream.uid(uidPrefix + "-shuffle");
+          }
+
+          return shuffleStream
+              .partitionCustom(new RangePartitioner(iSchema, sortOrder), r -> r)
+              .filter(StatisticsOrRecord::hasRecord)
+              .map(StatisticsOrRecord::record);
+
         default:
           throw new RuntimeException("Unrecognized " + WRITE_DISTRIBUTION_MODE + ": " + writeMode);
       }
@@ -577,12 +671,9 @@
       TypeUtil.validateWriteSchema(schema, writeSchema, true, true);
 
       // We use this flink schema to read values from RowData. The flink's TINYINT and SMALLINT will
-      // be promoted to
-      // iceberg INTEGER, that means if we use iceberg's table schema to read TINYINT (backend by 1
-      // 'byte'), we will
-      // read 4 bytes rather than 1 byte, it will mess up the byte array in BinaryRowData. So here
-      // we must use flink
-      // schema.
+      // be promoted to iceberg INTEGER, that means if we use iceberg's table schema to read TINYINT
+      // (backend by 1 'byte'), we will read 4 bytes rather than 1 byte, it will mess up the
+      // byte array in BinaryRowData. So here we must use flink schema.
       return (RowType) requestedSchema.toRowDataType().getLogicalType();
     } else {
       return FlinkSchemaUtil.convert(schema);
diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperatorFactory.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperatorFactory.java
new file mode 100644
index 0000000..dc147bf
--- /dev/null
+++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperatorFactory.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
+
+@Internal
+public class DataStatisticsOperatorFactory extends AbstractStreamOperatorFactory<StatisticsOrRecord>
+    implements CoordinatedOperatorFactory<StatisticsOrRecord>,
+        OneInputStreamOperatorFactory<RowData, StatisticsOrRecord> {
+
+  private final Schema schema;
+  private final SortOrder sortOrder;
+  private final int downstreamParallelism;
+  private final StatisticsType type;
+  private final double closeFileCostWeightPercentage;
+
+  public DataStatisticsOperatorFactory(
+      Schema schema,
+      SortOrder sortOrder,
+      int downstreamParallelism,
+      StatisticsType type,
+      double closeFileCostWeightPercentage) {
+    this.schema = schema;
+    this.sortOrder = sortOrder;
+    this.downstreamParallelism = downstreamParallelism;
+    this.type = type;
+    this.closeFileCostWeightPercentage = closeFileCostWeightPercentage;
+  }
+
+  @Override
+  public OperatorCoordinator.Provider getCoordinatorProvider(
+      String operatorName, OperatorID operatorID) {
+    return new DataStatisticsCoordinatorProvider(
+        operatorName,
+        operatorID,
+        schema,
+        sortOrder,
+        downstreamParallelism,
+        type,
+        closeFileCostWeightPercentage);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public <T extends StreamOperator<StatisticsOrRecord>> T createStreamOperator(
+      StreamOperatorParameters<StatisticsOrRecord> parameters) {
+    OperatorID operatorId = parameters.getStreamConfig().getOperatorID();
+    String operatorName = parameters.getStreamConfig().getOperatorName();
+    OperatorEventGateway gateway =
+        parameters.getOperatorEventDispatcher().getOperatorEventGateway(operatorId);
+
+    DataStatisticsOperator rangeStatisticsOperator =
+        new DataStatisticsOperator(
+            operatorName, schema, sortOrder, gateway, downstreamParallelism, type);
+
+    rangeStatisticsOperator.setup(
+        parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
+    parameters
+        .getOperatorEventDispatcher()
+        .registerEventHandler(operatorId, rangeStatisticsOperator);
+
+    return (T) rangeStatisticsOperator;
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
+    return DataStatisticsOperator.class;
+  }
+}
diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java
index 482cfd1..b63547d 100644
--- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java
+++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java
@@ -20,6 +20,7 @@
 
 import static org.apache.iceberg.flink.FlinkCatalogFactory.ICEBERG_CATALOG_TYPE_HADOOP;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 import java.io.File;
 import java.util.Arrays;
@@ -46,6 +47,7 @@
 import org.apache.iceberg.Parameter;
 import org.apache.iceberg.ParameterizedTestExtension;
 import org.apache.iceberg.Parameters;
+import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.catalog.Namespace;
@@ -54,6 +56,7 @@
 import org.apache.iceberg.hadoop.HadoopCatalog;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -241,4 +244,93 @@
       sql("DROP TABLE IF EXISTS %s.%s", FLINK_DATABASE, tableName);
     }
   }
+
+  @TestTemplate
+  public void testRangeDistributionPartitionColumn() {
+    // Range partitioner currently only works with streaming writes (with checkpoints)
+    assumeThat(isStreamingJob).isTrue();
+
+    // Initialize a BoundedSource table to precisely emit those rows in only one checkpoint.
+    List<List<Row>> rowsPerCheckpoint =
+        IntStream.range(1, 6)
+            .mapToObj(
+                checkpointId -> {
+                  List<Row> charRows = Lists.newArrayList();
+                  // emit 26x10 rows for each checkpoint cycle
+                  for (int i = 0; i < 10; ++i) {
+                    for (char c = 'a'; c <= 'z'; c++) {
+                      charRows.add(Row.of(c - 'a', String.valueOf(c)));
+                    }
+                  }
+                  return charRows;
+                })
+            .collect(Collectors.toList());
+    List<Row> flattenedRows =
+        rowsPerCheckpoint.stream().flatMap(List::stream).collect(Collectors.toList());
+
+    String dataId = BoundedTableFactory.registerDataSet(rowsPerCheckpoint);
+    sql(
+        "CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)"
+            + " WITH ('connector'='BoundedSource', 'data-id'='%s')",
+        SOURCE_TABLE, dataId);
+
+    assertThat(sql("SELECT * FROM %s", SOURCE_TABLE))
+        .as("Should have the expected rows in source table.")
+        .containsExactlyInAnyOrderElementsOf(flattenedRows);
+
+    Map<String, String> tableProps =
+        ImmutableMap.of(
+            "write.format.default",
+            FileFormat.PARQUET.name(),
+            TableProperties.WRITE_DISTRIBUTION_MODE,
+            DistributionMode.RANGE.modeName());
+
+    String tableName = "test_hash_distribution_mode";
+    sql(
+        "CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH %s",
+        tableName, toWithClause(tableProps));
+
+    try {
+      // Insert data set.
+      sql("INSERT INTO %s SELECT * FROM %s", tableName, SOURCE_TABLE);
+
+      assertThat(sql("SELECT * FROM %s", tableName))
+          .as("Should have the expected rows in sink table.")
+          .containsExactlyInAnyOrderElementsOf(flattenedRows);
+
+      Table table = catalog.loadTable(TableIdentifier.of(ICEBERG_NAMESPACE, tableName));
+      // ordered in reverse timeline from the newest snapshot to the oldest snapshot
+      List<Snapshot> snapshots = Lists.newArrayList(table.snapshots().iterator());
+      // only keep the snapshots with added data files
+      snapshots =
+          snapshots.stream()
+              .filter(snapshot -> snapshot.addedDataFiles(table.io()).iterator().hasNext())
+              .collect(Collectors.toList());
+
+      // Sometimes we will have more checkpoints than the bounded source if we pass the
+      // auto checkpoint interval. Thus producing multiple snapshots.
+      assertThat(snapshots).hasSizeGreaterThanOrEqualTo(5);
+
+      // It takes 2 checkpoint cycle for statistics collection and application
+      // of the globally aggregated statistics in the range partitioner.
+      // The last two checkpoints should have range shuffle applied
+      List<Snapshot> rangePartitionedCycles =
+          snapshots.subList(snapshots.size() - 2, snapshots.size());
+
+      for (Snapshot snapshot : rangePartitionedCycles) {
+        List<DataFile> addedDataFiles =
+            Lists.newArrayList(snapshot.addedDataFiles(table.io()).iterator());
+        // range partition results in each partition only assigned to one writer task
+        // maybe less than 26 partitions as BoundedSource doesn't always precisely
+        // control the checkpoint boundary.
+        // It is hard to precisely control the test condition in SQL tests.
+        // Here only minimal safe assertions are applied to avoid flakiness.
+        // If there are no shuffling, the number of data files could be as high as
+        // 26 * 4 as the default parallelism is set to 4 for the mini cluster.
+        assertThat(addedDataFiles).hasSizeLessThanOrEqualTo(26);
+      }
+    } finally {
+      sql("DROP TABLE IF EXISTS %s.%s", FLINK_DATABASE, tableName);
+    }
+  }
 }
diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java
index 75e397d..df8c3c7 100644
--- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java
+++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java
@@ -20,28 +20,37 @@
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.types.Row;
+import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Parameter;
 import org.apache.iceberg.ParameterizedTestExtension;
 import org.apache.iceberg.Parameters;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.flink.FlinkWriteOptions;
 import org.apache.iceberg.flink.HadoopCatalogExtension;
 import org.apache.iceberg.flink.MiniFlinkClusterExtension;
 import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.sink.shuffle.StatisticsType;
+import org.apache.iceberg.flink.source.BoundedTestSource;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Types;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -177,4 +186,309 @@
         .isInstanceOf(IllegalArgumentException.class)
         .hasMessage("Invalid distribution mode: UNRECOGNIZED");
   }
+
+  @TestTemplate
+  public void testRangeDistributionWithoutSortOrderUnpartitioned() throws Exception {
+    assumeThat(partitioned).isFalse();
+
+    table
+        .updateProperties()
+        .set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName())
+        .commit();
+
+    int numOfCheckpoints = 6;
+    DataStream<Row> dataStream =
+        env.addSource(
+            createRangeDistributionBoundedSource(createCharRows(numOfCheckpoints, 10)),
+            ROW_TYPE_INFO);
+    FlinkSink.Builder builder =
+        FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+            .table(table)
+            .tableLoader(tableLoader)
+            .writeParallelism(parallelism);
+
+    // Range distribution requires either sort order or partition spec defined
+    assertThatThrownBy(builder::append)
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage(
+            "Invalid write distribution mode: range. Need to define sort order or partition spec.");
+  }
+
+  @TestTemplate
+  public void testRangeDistributionWithoutSortOrderPartitioned() throws Exception {
+    assumeThat(partitioned).isTrue();
+
+    table
+        .updateProperties()
+        .set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName())
+        .commit();
+
+    int numOfCheckpoints = 6;
+    DataStream<Row> dataStream =
+        env.addSource(
+            createRangeDistributionBoundedSource(createCharRows(numOfCheckpoints, 10)),
+            ROW_TYPE_INFO);
+    FlinkSink.Builder builder =
+        FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+            .table(table)
+            .tableLoader(tableLoader)
+            .writeParallelism(parallelism);
+
+    // sort based on partition columns
+    builder.append();
+    env.execute(getClass().getSimpleName());
+
+    table.refresh();
+    // ordered in reverse timeline from the newest snapshot to the oldest snapshot
+    List<Snapshot> snapshots = Lists.newArrayList(table.snapshots().iterator());
+    // only keep the snapshots with added data files
+    snapshots =
+        snapshots.stream()
+            .filter(snapshot -> snapshot.addedDataFiles(table.io()).iterator().hasNext())
+            .collect(Collectors.toList());
+
+    // Sometimes we will have more checkpoints than the bounded source if we pass the
+    // auto checkpoint interval. Thus producing multiple snapshots.
+    assertThat(snapshots).hasSizeGreaterThanOrEqualTo(numOfCheckpoints);
+  }
+
+  @TestTemplate
+  public void testRangeDistributionWithSortOrder() throws Exception {
+    table
+        .updateProperties()
+        .set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName())
+        .commit();
+    table.replaceSortOrder().asc("data").commit();
+
+    int numOfCheckpoints = 6;
+    DataStream<Row> dataStream =
+        env.addSource(
+            createRangeDistributionBoundedSource(createCharRows(numOfCheckpoints, 10)),
+            ROW_TYPE_INFO);
+    FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+        .table(table)
+        .tableLoader(tableLoader)
+        .writeParallelism(parallelism)
+        .rangeDistributionStatisticsType(StatisticsType.Map)
+        .append();
+    env.execute(getClass().getSimpleName());
+
+    table.refresh();
+    // ordered in reverse timeline from the newest snapshot to the oldest snapshot
+    List<Snapshot> snapshots = Lists.newArrayList(table.snapshots().iterator());
+    // only keep the snapshots with added data files
+    snapshots =
+        snapshots.stream()
+            .filter(snapshot -> snapshot.addedDataFiles(table.io()).iterator().hasNext())
+            .collect(Collectors.toList());
+
+    // Sometimes we will have more checkpoints than the bounded source if we pass the
+    // auto checkpoint interval. Thus producing multiple snapshots.
+    assertThat(snapshots).hasSizeGreaterThanOrEqualTo(numOfCheckpoints);
+
+    // It takes 2 checkpoint cycle for statistics collection and application
+    // of the globally aggregated statistics in the range partitioner.
+    // The last two checkpoints should have range shuffle applied
+    List<Snapshot> rangePartitionedCycles =
+        snapshots.subList(snapshots.size() - 2, snapshots.size());
+
+    if (partitioned) {
+      for (Snapshot snapshot : rangePartitionedCycles) {
+        List<DataFile> addedDataFiles =
+            Lists.newArrayList(snapshot.addedDataFiles(table.io()).iterator());
+        // up to 26 partitions
+        assertThat(addedDataFiles).hasSizeLessThanOrEqualTo(26);
+      }
+    } else {
+      for (Snapshot snapshot : rangePartitionedCycles) {
+        List<DataFile> addedDataFiles =
+            Lists.newArrayList(snapshot.addedDataFiles(table.io()).iterator());
+        // each writer task should only write one file for non-partition sort column
+        assertThat(addedDataFiles).hasSize(parallelism);
+        // verify there is no overlap in min-max stats range
+        if (parallelism == 2) {
+          assertIdColumnStatsNoRangeOverlap(addedDataFiles.get(0), addedDataFiles.get(1));
+        }
+      }
+    }
+  }
+
+  @TestTemplate
+  public void testRangeDistributionSketchWithSortOrder() throws Exception {
+    table
+        .updateProperties()
+        .set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName())
+        .commit();
+    table.replaceSortOrder().asc("id").commit();
+
+    int numOfCheckpoints = 6;
+    DataStream<Row> dataStream =
+        env.addSource(
+            createRangeDistributionBoundedSource(createIntRows(numOfCheckpoints, 1_000)),
+            ROW_TYPE_INFO);
+    FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+        .table(table)
+        .tableLoader(tableLoader)
+        .writeParallelism(parallelism)
+        .rangeDistributionStatisticsType(StatisticsType.Sketch)
+        .append();
+    env.execute(getClass().getSimpleName());
+
+    table.refresh();
+    // ordered in reverse timeline from the newest snapshot to the oldest snapshot
+    List<Snapshot> snapshots = Lists.newArrayList(table.snapshots().iterator());
+    // only keep the snapshots with added data files
+    snapshots =
+        snapshots.stream()
+            .filter(snapshot -> snapshot.addedDataFiles(table.io()).iterator().hasNext())
+            .collect(Collectors.toList());
+
+    // Sometimes we will have more checkpoints than the bounded source if we pass the
+    // auto checkpoint interval. Thus producing multiple snapshots.
+    assertThat(snapshots).hasSizeGreaterThanOrEqualTo(numOfCheckpoints);
+
+    // It takes 2 checkpoint cycle for statistics collection and application
+    // of the globally aggregated statistics in the range partitioner.
+    // The last two checkpoints should have range shuffle applied
+    List<Snapshot> rangePartitionedCycles =
+        snapshots.subList(snapshots.size() - 2, snapshots.size());
+
+    // since the input has a single value for the data column,
+    // it is always the same partition. Hence there is no difference
+    // for partitioned or not
+    for (Snapshot snapshot : rangePartitionedCycles) {
+      List<DataFile> addedDataFiles =
+          Lists.newArrayList(snapshot.addedDataFiles(table.io()).iterator());
+      // each writer task should only write one file for non-partition sort column
+      assertThat(addedDataFiles).hasSize(parallelism);
+      // verify there is no overlap in min-max stats range
+      if (parallelism == 2) {
+        assertIdColumnStatsNoRangeOverlap(addedDataFiles.get(0), addedDataFiles.get(1));
+      }
+    }
+  }
+
+  /** Test migration from Map stats to Sketch stats */
+  @TestTemplate
+  public void testRangeDistributionStatisticsMigration() throws Exception {
+    table
+        .updateProperties()
+        .set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName())
+        .commit();
+    table.replaceSortOrder().asc("id").commit();
+
+    int numOfCheckpoints = 4;
+    List<List<Row>> rowsPerCheckpoint = Lists.newArrayListWithCapacity(numOfCheckpoints);
+    for (int checkpointId = 0; checkpointId < numOfCheckpoints; ++checkpointId) {
+      // checkpointId 2 would emit 11_000 records which is larger than
+      // the OPERATOR_SKETCH_SWITCH_THRESHOLD of 10_000.
+      // This should trigger the stats migration.
+      int maxId = checkpointId < 1 ? 1_000 : 11_000;
+      List<Row> rows = Lists.newArrayListWithCapacity(maxId);
+      for (int j = 0; j < maxId; ++j) {
+        // fixed value "a" for the data (possible partition column)
+        rows.add(Row.of(j, "a"));
+      }
+
+      rowsPerCheckpoint.add(rows);
+    }
+
+    DataStream<Row> dataStream =
+        env.addSource(createRangeDistributionBoundedSource(rowsPerCheckpoint), ROW_TYPE_INFO);
+    FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+        .table(table)
+        .tableLoader(tableLoader)
+        .writeParallelism(parallelism)
+        .rangeDistributionStatisticsType(StatisticsType.Auto)
+        .append();
+    env.execute(getClass().getSimpleName());
+
+    table.refresh();
+    // ordered in reverse timeline from the newest snapshot to the oldest snapshot
+    List<Snapshot> snapshots = Lists.newArrayList(table.snapshots().iterator());
+    // only keep the snapshots with added data files
+    snapshots =
+        snapshots.stream()
+            .filter(snapshot -> snapshot.addedDataFiles(table.io()).iterator().hasNext())
+            .collect(Collectors.toList());
+
+    // Sometimes we will have more checkpoints than the bounded source if we pass the
+    // auto checkpoint interval. Thus producing multiple snapshots.
+    assertThat(snapshots).hasSizeGreaterThanOrEqualTo(numOfCheckpoints);
+
+    // It takes 2 checkpoint cycle for statistics collection and application
+    // of the globally aggregated statistics in the range partitioner.
+    // The last two checkpoints should have range shuffle applied
+    List<Snapshot> rangePartitionedCycles =
+        snapshots.subList(snapshots.size() - 2, snapshots.size());
+
+    // since the input has a single value for the data column,
+    // it is always the same partition. Hence there is no difference
+    // for partitioned or not
+    for (Snapshot snapshot : rangePartitionedCycles) {
+      List<DataFile> addedDataFiles =
+          Lists.newArrayList(snapshot.addedDataFiles(table.io()).iterator());
+      // each writer task should only write one file for non-partition sort column
+      // sometimes
+      assertThat(addedDataFiles).hasSize(parallelism);
+      // verify there is no overlap in min-max stats range
+      if (parallelism == 2) {
+        assertIdColumnStatsNoRangeOverlap(addedDataFiles.get(0), addedDataFiles.get(1));
+      }
+    }
+  }
+
+  private BoundedTestSource<Row> createRangeDistributionBoundedSource(
+      List<List<Row>> rowsPerCheckpoint) {
+    return new BoundedTestSource<>(rowsPerCheckpoint);
+  }
+
+  private List<List<Row>> createCharRows(int numOfCheckpoints, int countPerChar) {
+    List<List<Row>> rowsPerCheckpoint = Lists.newArrayListWithCapacity(numOfCheckpoints);
+    for (int checkpointId = 0; checkpointId < numOfCheckpoints; ++checkpointId) {
+      List<Row> rows = Lists.newArrayListWithCapacity(26 * countPerChar);
+      for (int j = 0; j < countPerChar; ++j) {
+        for (char c = 'a'; c <= 'z'; ++c) {
+          rows.add(Row.of(1, String.valueOf(c)));
+        }
+      }
+
+      rowsPerCheckpoint.add(rows);
+    }
+
+    return rowsPerCheckpoint;
+  }
+
+  private List<List<Row>> createIntRows(int numOfCheckpoints, int maxId) {
+    List<List<Row>> rowsPerCheckpoint = Lists.newArrayListWithCapacity(numOfCheckpoints);
+    for (int checkpointId = 0; checkpointId < numOfCheckpoints; ++checkpointId) {
+      List<Row> rows = Lists.newArrayListWithCapacity(maxId);
+      for (int j = 0; j < maxId; ++j) {
+        // fixed value "a" for the data (possible partition column)
+        rows.add(Row.of(j, "a"));
+      }
+
+      rowsPerCheckpoint.add(rows);
+    }
+
+    return rowsPerCheckpoint;
+  }
+
+  private void assertIdColumnStatsNoRangeOverlap(DataFile file1, DataFile file2) {
+    // id column has fieldId 1
+    int file1LowerBound =
+        Conversions.fromByteBuffer(Types.IntegerType.get(), file1.lowerBounds().get(1));
+    int file1UpperBound =
+        Conversions.fromByteBuffer(Types.IntegerType.get(), file1.upperBounds().get(1));
+    int file2LowerBound =
+        Conversions.fromByteBuffer(Types.IntegerType.get(), file2.lowerBounds().get(1));
+    int file2UpperBound =
+        Conversions.fromByteBuffer(Types.IntegerType.get(), file2.upperBounds().get(1));
+
+    if (file1LowerBound < file2LowerBound) {
+      assertThat(file1UpperBound).isLessThanOrEqualTo(file2LowerBound);
+    } else {
+      assertThat(file2UpperBound).isLessThanOrEqualTo(file1LowerBound);
+    }
+  }
 }
diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
index 577c549..b283b83 100644
--- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
+++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
@@ -30,6 +30,7 @@
 import org.apache.flink.types.Row;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.ParameterizedTestExtension;
@@ -184,11 +185,21 @@
         .hasMessage(
             "OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");
 
-    assertThatThrownBy(
-            () -> builder.equalityFieldColumns(ImmutableList.of()).overwrite(false).append())
-        .isInstanceOf(IllegalStateException.class)
-        .hasMessage(
-            "Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
+    if (writeDistributionMode.equals(DistributionMode.RANGE.modeName()) && !partitioned) {
+      // validation error thrown from distributeDataStream
+      assertThatThrownBy(
+              () -> builder.equalityFieldColumns(ImmutableList.of()).overwrite(false).append())
+          .isInstanceOf(IllegalStateException.class)
+          .hasMessage(
+              "Invalid write distribution mode: range. Need to define sort order or partition spec.");
+    } else {
+      // validation error thrown from appendWriter
+      assertThatThrownBy(
+              () -> builder.equalityFieldColumns(ImmutableList.of()).overwrite(false).append())
+          .isInstanceOf(IllegalStateException.class)
+          .hasMessage(
+              "Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
+    }
   }
 
   @TestTemplate