[core] Support writing blob file for rolling. (#6340)

diff --git a/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java b/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java
index 04be878..7f3fa01 100644
--- a/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java
+++ b/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java
@@ -19,6 +19,10 @@
 package org.apache.paimon.types;
 
 import org.apache.paimon.annotation.Public;
+import org.apache.paimon.utils.Pair;
+
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * Data type of binary large object.
@@ -61,4 +65,21 @@
     public <R> R accept(DataTypeVisitor<R> visitor) {
         return visitor.visit(this);
     }
+
+    public static Pair<RowType, RowType> splitBlob(RowType rowType) {
+        List<DataField> fields = rowType.getFields();
+        List<DataField> normalFields = new ArrayList<>();
+        List<DataField> blobFields = new ArrayList<>();
+
+        for (DataField field : fields) {
+            DataTypeRoot type = field.type().getTypeRoot();
+            if (type == DataTypeRoot.BLOB) {
+                blobFields.add(field);
+            } else {
+                normalFields.add(field);
+            }
+        }
+
+        return Pair.of(new RowType(normalFields), new RowType(blobFields));
+    }
 }
diff --git a/paimon-api/src/main/java/org/apache/paimon/types/RowType.java b/paimon-api/src/main/java/org/apache/paimon/types/RowType.java
index 014f85a..e592ff7 100644
--- a/paimon-api/src/main/java/org/apache/paimon/types/RowType.java
+++ b/paimon-api/src/main/java/org/apache/paimon/types/RowType.java
@@ -286,13 +286,6 @@
         }
     }
 
-    public RowType appendDataField(String name, DataType type) {
-        List<DataField> newFields = new ArrayList<>(fields);
-        int newId = currentHighestFieldId(fields) + 1;
-        newFields.add(new DataField(newId, name, type));
-        return new RowType(newFields);
-    }
-
     public RowType project(int[] mapping) {
         List<DataField> fields = getFields();
         return new RowType(
diff --git a/paimon-core/src/main/java/org/apache/paimon/append/PeojectedFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/PeojectedFileWriter.java
new file mode 100644
index 0000000..db9140d
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/append/PeojectedFileWriter.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.io.FileWriter;
+import org.apache.paimon.utils.ProjectedRow;
+
+import java.io.IOException;
+
+/**
+ * A delegating {@link FileWriter} which applies a field projection to each incoming {@link
+ * InternalRow} before forwarding it to the underlying writer.
+ *
+ * <p>This is useful when the physical file schema is a subset of the logical write schema. The
+ * projection is evaluated via {@link ProjectedRow} to avoid object allocations.
+ */
+public class PeojectedFileWriter<T extends FileWriter<InternalRow, R>, R>
+        implements FileWriter<InternalRow, R> {
+
+    private final T writer;
+    private final ProjectedRow projectedRow;
+
+    public PeojectedFileWriter(T writer, int[] projection) {
+        this.writer = writer;
+        this.projectedRow = ProjectedRow.from(projection);
+    }
+
+    @Override
+    public void write(InternalRow record) throws IOException {
+        projectedRow.replaceRow(record);
+        writer.write(projectedRow);
+    }
+
+    @Override
+    public long recordCount() {
+        return writer.recordCount();
+    }
+
+    @Override
+    public void abort() {
+        writer.abort();
+    }
+
+    @Override
+    public R result() throws IOException {
+        return writer.result();
+    }
+
+    @Override
+    public void close() throws IOException {
+        writer.close();
+    }
+
+    public T writer() {
+        return writer;
+    }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
new file mode 100644
index 0000000..3464465
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fileindex.FileIndexOptions;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.blob.BlobFileFormat;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.io.BundleRecords;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.io.RollingFileWriter;
+import org.apache.paimon.io.RollingFileWriterImpl;
+import org.apache.paimon.io.RowDataFileWriter;
+import org.apache.paimon.io.SingleFileWriter;
+import org.apache.paimon.io.SingleFileWriter.AbortExecutor;
+import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.statistics.NoneSimpleColStatsCollector;
+import org.apache.paimon.statistics.SimpleColStatsCollector;
+import org.apache.paimon.types.BlobType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.LongCounter;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.StatsCollectorFactories;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * A rolling file writer that handles both normal data and blob data. This writer creates separate
+ * files for normal columns and blob columns, managing their lifecycle and ensuring consistency
+ * between them.
+ *
+ * <pre>
+ * For example,
+ * given a table schema with normal columns (id INT, name STRING) and a blob column (data BLOB),
+ * this writer will create separate files for (id, name) and (data).
+ * It will roll files based on the specified target file size, ensuring that both normal and blob
+ * files are rolled simultaneously.
+ *
+ * Every time a file is rolled, the writer will close the current normal data file and blob data files,
+ * so one normal data file may correspond to multiple blob data files.
+ *
+ * Normal file1: f1.parquet may including (b1.blob, b2.blob, b3.blob)
+ * Normal file2: f1-2.parquet may including (b4.blob, b5.blob)
+ *
+ * </pre>
+ */
+public class RollingBlobFileWriter implements RollingFileWriter<InternalRow, DataFileMeta> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RollingBlobFileWriter.class);
+
+    /** Constant for checking rolling condition periodically. */
+    private static final long CHECK_ROLLING_RECORD_CNT = 1000L;
+
+    /** Expected number of blob fields in a table. */
+    private static final int EXPECTED_BLOB_FIELD_COUNT = 1;
+
+    // Core components
+    private final Supplier<
+                    PeojectedFileWriter<SingleFileWriter<InternalRow, DataFileMeta>, DataFileMeta>>
+            writerFactory;
+    private final PeojectedFileWriter<
+                    RollingFileWriterImpl<InternalRow, DataFileMeta>, List<DataFileMeta>>
+            blobWriter;
+    private final long targetFileSize;
+
+    // State management
+    private final List<AbortExecutor> closedWriters;
+    private final List<DataFileMeta> results;
+    private PeojectedFileWriter<SingleFileWriter<InternalRow, DataFileMeta>, DataFileMeta>
+            currentWriter;
+    private long recordCount = 0;
+    private boolean closed = false;
+
+    public RollingBlobFileWriter(
+            FileIO fileIO,
+            long schemaId,
+            FileFormat fileFormat,
+            long targetFileSize,
+            RowType writeSchema,
+            DataFilePathFactory pathFactory,
+            LongCounter seqNumCounter,
+            String fileCompression,
+            StatsCollectorFactories statsCollectorFactories,
+            FileIndexOptions fileIndexOptions,
+            FileSource fileSource,
+            boolean asyncFileWrite,
+            boolean statsDenseStore) {
+
+        // Initialize basic fields
+        this.targetFileSize = targetFileSize;
+        this.results = new ArrayList<>();
+        this.closedWriters = new ArrayList<>();
+
+        // Split schema into normal and blob parts
+        Pair<RowType, RowType> typeWithBlob = BlobType.splitBlob(writeSchema);
+        RowType normalRowType = typeWithBlob.getLeft();
+        RowType blobType = typeWithBlob.getRight();
+
+        // Initialize writer factory for normal data
+        this.writerFactory =
+                createNormalWriterFactory(
+                        fileIO,
+                        schemaId,
+                        fileFormat,
+                        normalRowType,
+                        writeSchema,
+                        pathFactory,
+                        seqNumCounter,
+                        fileCompression,
+                        statsCollectorFactories,
+                        fileIndexOptions,
+                        fileSource,
+                        asyncFileWrite,
+                        statsDenseStore);
+
+        // Initialize blob writer
+        this.blobWriter =
+                createBlobWriter(
+                        fileIO,
+                        schemaId,
+                        blobType,
+                        writeSchema,
+                        pathFactory,
+                        seqNumCounter,
+                        fileSource,
+                        asyncFileWrite,
+                        statsDenseStore,
+                        targetFileSize);
+    }
+
+    /** Creates a factory for normal data writers. */
+    private Supplier<PeojectedFileWriter<SingleFileWriter<InternalRow, DataFileMeta>, DataFileMeta>>
+            createNormalWriterFactory(
+                    FileIO fileIO,
+                    long schemaId,
+                    FileFormat fileFormat,
+                    RowType normalRowType,
+                    RowType writeSchema,
+                    DataFilePathFactory pathFactory,
+                    LongCounter seqNumCounter,
+                    String fileCompression,
+                    StatsCollectorFactories statsCollectorFactories,
+                    FileIndexOptions fileIndexOptions,
+                    FileSource fileSource,
+                    boolean asyncFileWrite,
+                    boolean statsDenseStore) {
+
+        List<String> normalColumnNames = normalRowType.getFieldNames();
+        int[] projectionNormalFields = writeSchema.projectIndexes(normalColumnNames);
+
+        return () -> {
+            RowDataFileWriter rowDataFileWriter =
+                    new RowDataFileWriter(
+                            fileIO,
+                            RollingFileWriter.createFileWriterContext(
+                                    fileFormat,
+                                    normalRowType,
+                                    statsCollectorFactories.statsCollectors(normalColumnNames),
+                                    fileCompression),
+                            pathFactory.newPath(),
+                            normalRowType,
+                            schemaId,
+                            seqNumCounter,
+                            fileIndexOptions,
+                            fileSource,
+                            asyncFileWrite,
+                            statsDenseStore,
+                            pathFactory.isExternalPath(),
+                            normalColumnNames);
+            return new PeojectedFileWriter<>(rowDataFileWriter, projectionNormalFields);
+        };
+    }
+
+    /** Creates a blob writer for handling blob data. */
+    private PeojectedFileWriter<
+                    RollingFileWriterImpl<InternalRow, DataFileMeta>, List<DataFileMeta>>
+            createBlobWriter(
+                    FileIO fileIO,
+                    long schemaId,
+                    RowType blobType,
+                    RowType writeSchema,
+                    DataFilePathFactory pathFactory,
+                    LongCounter seqNumCounter,
+                    FileSource fileSource,
+                    boolean asyncFileWrite,
+                    boolean statsDenseStore,
+                    long targetFileSize) {
+
+        BlobFileFormat blobFileFormat = new BlobFileFormat();
+        List<String> blobNames = blobType.getFieldNames();
+
+        // Validate blob field count
+        checkArgument(
+                blobNames.size() == EXPECTED_BLOB_FIELD_COUNT,
+                "Limit exactly one blob fields in one paimon table yet.");
+
+        int[] blobProjection = writeSchema.projectIndexes(blobNames);
+        return new PeojectedFileWriter<>(
+                new RollingFileWriterImpl<>(
+                        () ->
+                                new RowDataFileWriter(
+                                        fileIO,
+                                        RollingFileWriter.createFileWriterContext(
+                                                blobFileFormat,
+                                                blobType,
+                                                new SimpleColStatsCollector.Factory[] {
+                                                    NoneSimpleColStatsCollector::new
+                                                },
+                                                "none"),
+                                        pathFactory.newBlobPath(),
+                                        blobType,
+                                        schemaId,
+                                        seqNumCounter,
+                                        new FileIndexOptions(),
+                                        fileSource,
+                                        asyncFileWrite,
+                                        statsDenseStore,
+                                        pathFactory.isExternalPath(),
+                                        blobNames),
+                        targetFileSize),
+                blobProjection);
+    }
+
+    /**
+     * Writes a single row to both normal and blob writers. Automatically handles file rolling when
+     * target size is reached.
+     *
+     * @param row The row to write
+     * @throws IOException if writing fails
+     */
+    @Override
+    public void write(InternalRow row) throws IOException {
+        try {
+            if (currentWriter == null) {
+                currentWriter = writerFactory.get();
+            }
+            currentWriter.write(row);
+            blobWriter.write(row);
+            recordCount++;
+
+            if (rollingFile()) {
+                closeCurrentWriter();
+            }
+        } catch (Throwable e) {
+            handleWriteException(e);
+            throw e;
+        }
+    }
+
+    /** Handles write exceptions by logging and cleaning up resources. */
+    private void handleWriteException(Throwable e) {
+        String filePath = (currentWriter == null) ? null : currentWriter.writer().path().toString();
+        LOG.warn("Exception occurs when writing file {}. Cleaning up.", filePath, e);
+        abort();
+    }
+
+    /**
+     * Writes a bundle of records by iterating through each row.
+     *
+     * @param bundle The bundle of records to write
+     * @throws IOException if writing fails
+     */
+    @Override
+    public void writeBundle(BundleRecords bundle) throws IOException {
+        // TODO: support bundle projection
+        for (InternalRow row : bundle) {
+            write(row);
+        }
+    }
+
+    /**
+     * Returns the total number of records written.
+     *
+     * @return the record count
+     */
+    @Override
+    public long recordCount() {
+        return recordCount;
+    }
+
+    /**
+     * Aborts all writers and cleans up resources. This method should be called when an error occurs
+     * during writing.
+     */
+    @Override
+    public void abort() {
+        if (currentWriter != null) {
+            currentWriter.abort();
+            currentWriter = null;
+        }
+        for (AbortExecutor abortExecutor : closedWriters) {
+            abortExecutor.abort();
+        }
+        blobWriter.abort();
+    }
+
+    /** Checks if the current file should be rolled based on size and record count. */
+    private boolean rollingFile() throws IOException {
+        return currentWriter
+                .writer()
+                .reachTargetSize(recordCount % CHECK_ROLLING_RECORD_CNT == 0, targetFileSize);
+    }
+
+    /**
+     * Closes the current writer and processes the results. Validates consistency between main and
+     * blob files.
+     *
+     * @throws IOException if closing fails
+     */
+    private void closeCurrentWriter() throws IOException {
+        if (currentWriter == null) {
+            return;
+        }
+
+        // Close main writer and get metadata
+        DataFileMeta mainDataFileMeta = closeMainWriter();
+
+        // Close blob writer and process blob metadata
+        List<DataFileMeta> blobMetas = closeBlobWriter();
+
+        // Validate consistency between main and blob files
+        validateFileConsistency(mainDataFileMeta, blobMetas);
+
+        // Add results to the results list
+        results.add(mainDataFileMeta);
+        results.addAll(blobMetas);
+
+        // Reset current writer
+        currentWriter = null;
+    }
+
+    /** Closes the main writer and returns its metadata. */
+    private DataFileMeta closeMainWriter() throws IOException {
+        currentWriter.close();
+        closedWriters.add(currentWriter.writer().abortExecutor());
+        return currentWriter.result();
+    }
+
+    /** Closes the blob writer and processes blob metadata with appropriate tags. */
+    private List<DataFileMeta> closeBlobWriter() throws IOException {
+        blobWriter.close();
+        return blobWriter.result();
+    }
+
+    /** Validates that the row counts match between main and blob files. */
+    private void validateFileConsistency(
+            DataFileMeta mainDataFileMeta, List<DataFileMeta> blobTaggedMetas) {
+        long mainRowCount = mainDataFileMeta.rowCount();
+        long blobRowCount = blobTaggedMetas.stream().mapToLong(DataFileMeta::rowCount).sum();
+
+        if (mainRowCount != blobRowCount) {
+            throw new IllegalStateException(
+                    String.format(
+                            "This is a bug: The row count of main file and blob files does not match. "
+                                    + "Main file: %s (row count: %d), blob files: %s (total row count: %d)",
+                            mainDataFileMeta, mainRowCount, blobTaggedMetas, blobRowCount));
+        }
+    }
+
+    /**
+     * Returns the list of file metadata for all written files. This method can only be called after
+     * the writer has been closed.
+     *
+     * @return list of file metadata
+     * @throws IllegalStateException if the writer is not closed
+     */
+    @Override
+    public List<DataFileMeta> result() {
+        Preconditions.checkState(closed, "Cannot access the results unless close all writers.");
+        return results;
+    }
+
+    /**
+     * Closes the writer and finalizes all files. This method ensures proper cleanup and validation
+     * of written data.
+     *
+     * @throws IOException if closing fails
+     */
+    @Override
+    public void close() throws IOException {
+        if (closed) {
+            return;
+        }
+
+        try {
+            closeCurrentWriter();
+        } catch (IOException e) {
+            handleCloseException(e);
+            throw e;
+        } finally {
+            closed = true;
+        }
+    }
+
+    /** Handles exceptions that occur during closing. */
+    private void handleCloseException(IOException e) {
+        String filePath = (currentWriter == null) ? null : currentWriter.writer().path().toString();
+        LOG.warn("Exception occurs when writing file {}. Cleaning up.", filePath, e);
+        abort();
+    }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
index bc5c577..2e05b25 100644
--- a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
@@ -31,7 +31,7 @@
 import org.apache.paimon.iceberg.IcebergPathFactory;
 import org.apache.paimon.iceberg.manifest.IcebergManifestFileMeta.Content;
 import org.apache.paimon.iceberg.metadata.IcebergPartitionSpec;
-import org.apache.paimon.io.RollingFileWriter;
+import org.apache.paimon.io.RollingFileWriterImpl;
 import org.apache.paimon.io.SingleFileWriter;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.options.MemorySize;
@@ -170,8 +170,8 @@
 
     public List<IcebergManifestFileMeta> rollingWrite(
             Iterator<IcebergManifestEntry> entries, long sequenceNumber, Content content) {
-        RollingFileWriter<IcebergManifestEntry, IcebergManifestFileMeta> writer =
-                new RollingFileWriter<>(
+        RollingFileWriterImpl<IcebergManifestEntry, IcebergManifestFileMeta> writer =
+                new RollingFileWriterImpl<>(
                         () -> createWriter(sequenceNumber, content), targetFileSize.getBytes());
         try {
             writer.write(entries);
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
index 3baa9d2..e8ee221 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
@@ -82,6 +82,10 @@
         return newPath(dataFilePrefix);
     }
 
+    public Path newBlobPath() {
+        return newPathFromName(newFileName(dataFilePrefix, ".blob"));
+    }
+
     public Path newChangelogPath() {
         return newPath(changelogFilePrefix);
     }
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
index 7afbd84..613beda 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
@@ -97,7 +97,7 @@
     public RollingFileWriter<KeyValue, DataFileMeta> createRollingMergeTreeFileWriter(
             int level, FileSource fileSource) {
         WriteFormatKey key = new WriteFormatKey(level, false);
-        return new RollingFileWriter<>(
+        return new RollingFileWriterImpl<>(
                 () -> {
                     DataFilePathFactory pathFactory = formatContext.pathFactory(key);
                     return createDataFileWriter(
@@ -108,7 +108,7 @@
 
     public RollingFileWriter<KeyValue, DataFileMeta> createRollingChangelogFileWriter(int level) {
         WriteFormatKey key = new WriteFormatKey(level, true);
-        return new RollingFileWriter<>(
+        return new RollingFileWriterImpl<>(
                 () -> {
                     DataFilePathFactory pathFactory = formatContext.pathFactory(key);
                     return createDataFileWriter(
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriter.java
index 29b9223..18846bc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriter.java
@@ -19,160 +19,58 @@
 package org.apache.paimon.io;
 
 import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.io.SingleFileWriter.AbortExecutor;
-import org.apache.paimon.utils.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.SimpleStatsCollector;
+import org.apache.paimon.format.avro.AvroFileFormat;
+import org.apache.paimon.statistics.NoneSimpleColStatsCollector;
+import org.apache.paimon.statistics.SimpleColStatsCollector;
+import org.apache.paimon.types.RowType;
 
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
-import java.util.function.Supplier;
 
 /**
- * Writer to roll over to a new file if the current size exceed the target file size.
+ * A file writer that automatically rolls over to a new file when a target size is reached and
+ * supports writing bundles of records.
  *
- * @param <T> record data type.
- * @param <R> the file metadata result.
+ * <p>This interface also provides utilities to construct a {@link FileWriterContext} with
+ * appropriate statistics collection (collector-based for Avro, extractor-based for others), so
+ * implementations can focus on rolling logic and I/O.
  */
-public class RollingFileWriter<T, R> implements FileWriter<T, List<R>> {
+public interface RollingFileWriter<T, R> extends FileWriter<T, List<R>> {
 
-    private static final Logger LOG = LoggerFactory.getLogger(RollingFileWriter.class);
+    int CHECK_ROLLING_RECORD_CNT = 1000;
 
-    private static final int CHECK_ROLLING_RECORD_CNT = 1000;
-
-    private final Supplier<? extends SingleFileWriter<T, R>> writerFactory;
-    private final long targetFileSize;
-    private final List<AbortExecutor> closedWriters;
-    private final List<R> results;
-
-    private SingleFileWriter<T, R> currentWriter = null;
-    private long recordCount = 0;
-    private boolean closed = false;
-
-    public RollingFileWriter(
-            Supplier<? extends SingleFileWriter<T, R>> writerFactory, long targetFileSize) {
-        this.writerFactory = writerFactory;
-        this.targetFileSize = targetFileSize;
-        this.results = new ArrayList<>();
-        this.closedWriters = new ArrayList<>();
-    }
+    void writeBundle(BundleRecords records) throws IOException;
 
     @VisibleForTesting
-    public long targetFileSize() {
-        return targetFileSize;
+    static FileWriterContext createFileWriterContext(
+            FileFormat fileFormat,
+            RowType rowType,
+            SimpleColStatsCollector.Factory[] statsCollectors,
+            String fileCompression) {
+        return new FileWriterContext(
+                fileFormat.createWriterFactory(rowType),
+                createStatsProducer(fileFormat, rowType, statsCollectors),
+                fileCompression);
     }
 
-    private boolean rollingFile(boolean forceCheck) throws IOException {
-        return currentWriter.reachTargetSize(
-                forceCheck || recordCount % CHECK_ROLLING_RECORD_CNT == 0, targetFileSize);
-    }
-
-    @Override
-    public void write(T row) throws IOException {
-        try {
-            // Open the current writer if write the first record or roll over happen before.
-            if (currentWriter == null) {
-                openCurrentWriter();
-            }
-
-            currentWriter.write(row);
-            recordCount += 1;
-
-            if (rollingFile(false)) {
-                closeCurrentWriter();
-            }
-        } catch (Throwable e) {
-            LOG.warn(
-                    "Exception occurs when writing file "
-                            + (currentWriter == null ? null : currentWriter.path())
-                            + ". Cleaning up.",
-                    e);
-            abort();
-            throw e;
+    static SimpleStatsProducer createStatsProducer(
+            FileFormat fileFormat,
+            RowType rowType,
+            SimpleColStatsCollector.Factory[] statsCollectors) {
+        boolean isDisabled =
+                Arrays.stream(SimpleColStatsCollector.create(statsCollectors))
+                        .allMatch(p -> p instanceof NoneSimpleColStatsCollector);
+        if (isDisabled) {
+            return SimpleStatsProducer.disabledProducer();
         }
-    }
-
-    public void writeBundle(BundleRecords bundle) throws IOException {
-        try {
-            // Open the current writer if write the first record or roll over happen before.
-            if (currentWriter == null) {
-                openCurrentWriter();
-            }
-
-            currentWriter.writeBundle(bundle);
-            recordCount += bundle.rowCount();
-
-            if (rollingFile(true)) {
-                closeCurrentWriter();
-            }
-        } catch (Throwable e) {
-            LOG.warn(
-                    "Exception occurs when writing file "
-                            + (currentWriter == null ? null : currentWriter.path())
-                            + ". Cleaning up.",
-                    e);
-            abort();
-            throw e;
+        if (fileFormat instanceof AvroFileFormat) {
+            SimpleStatsCollector collector = new SimpleStatsCollector(rowType, statsCollectors);
+            return SimpleStatsProducer.fromCollector(collector);
         }
-    }
-
-    private void openCurrentWriter() {
-        currentWriter = writerFactory.get();
-    }
-
-    private void closeCurrentWriter() throws IOException {
-        if (currentWriter == null) {
-            return;
-        }
-
-        currentWriter.close();
-        // only store abort executor in memory
-        // cannot store whole writer, it includes lots of memory for example column vectors to read
-        // and write
-        closedWriters.add(currentWriter.abortExecutor());
-        results.add(currentWriter.result());
-        currentWriter = null;
-    }
-
-    @Override
-    public long recordCount() {
-        return recordCount;
-    }
-
-    @Override
-    public void abort() {
-        if (currentWriter != null) {
-            currentWriter.abort();
-        }
-        for (AbortExecutor abortExecutor : closedWriters) {
-            abortExecutor.abort();
-        }
-    }
-
-    @Override
-    public List<R> result() {
-        Preconditions.checkState(closed, "Cannot access the results unless close all writers.");
-        return results;
-    }
-
-    @Override
-    public void close() throws IOException {
-        if (closed) {
-            return;
-        }
-
-        try {
-            closeCurrentWriter();
-        } catch (IOException e) {
-            LOG.warn(
-                    "Exception occurs when writing file " + currentWriter.path() + ". Cleaning up.",
-                    e);
-            abort();
-            throw e;
-        } finally {
-            closed = true;
-        }
+        return SimpleStatsProducer.fromExtractor(
+                fileFormat.createStatsExtractor(rowType, statsCollectors).orElse(null));
     }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriterImpl.java b/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriterImpl.java
new file mode 100644
index 0000000..76ee44b
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriterImpl.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.io.SingleFileWriter.AbortExecutor;
+import org.apache.paimon.utils.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+
+/**
+ * Writer to roll over to a new file if the current size exceed the target file size.
+ *
+ * @param <T> record data type.
+ * @param <R> the file metadata result.
+ */
+public class RollingFileWriterImpl<T, R> implements RollingFileWriter<T, R> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RollingFileWriterImpl.class);
+
+    private final Supplier<? extends SingleFileWriter<T, R>> writerFactory;
+    private final long targetFileSize;
+    private final List<AbortExecutor> closedWriters;
+    protected final List<R> results;
+
+    private SingleFileWriter<T, R> currentWriter = null;
+    private long recordCount = 0;
+    private boolean closed = false;
+
+    public RollingFileWriterImpl(
+            Supplier<? extends SingleFileWriter<T, R>> writerFactory, long targetFileSize) {
+        this.writerFactory = writerFactory;
+        this.targetFileSize = targetFileSize;
+        this.results = new ArrayList<>();
+        this.closedWriters = new ArrayList<>();
+    }
+
+    @VisibleForTesting
+    public long targetFileSize() {
+        return targetFileSize;
+    }
+
+    private boolean rollingFile(boolean forceCheck) throws IOException {
+        return currentWriter.reachTargetSize(
+                forceCheck || recordCount % CHECK_ROLLING_RECORD_CNT == 0, targetFileSize);
+    }
+
+    @Override
+    public void write(T row) throws IOException {
+        try {
+            // Open the current writer if write the first record or roll over happen before.
+            if (currentWriter == null) {
+                openCurrentWriter();
+            }
+
+            currentWriter.write(row);
+            recordCount += 1;
+
+            if (rollingFile(false)) {
+                closeCurrentWriter();
+            }
+        } catch (Throwable e) {
+            LOG.warn(
+                    "Exception occurs when writing file "
+                            + (currentWriter == null ? null : currentWriter.path())
+                            + ". Cleaning up.",
+                    e);
+            abort();
+            throw e;
+        }
+    }
+
+    @Override
+    public void writeBundle(BundleRecords bundle) throws IOException {
+        try {
+            // Open the current writer if write the first record or roll over happen before.
+            if (currentWriter == null) {
+                openCurrentWriter();
+            }
+
+            currentWriter.writeBundle(bundle);
+            recordCount += bundle.rowCount();
+
+            if (rollingFile(true)) {
+                closeCurrentWriter();
+            }
+        } catch (Throwable e) {
+            LOG.warn(
+                    "Exception occurs when writing file "
+                            + (currentWriter == null ? null : currentWriter.path())
+                            + ". Cleaning up.",
+                    e);
+            abort();
+            throw e;
+        }
+    }
+
+    private void openCurrentWriter() {
+        currentWriter = writerFactory.get();
+    }
+
+    protected void closeCurrentWriter() throws IOException {
+        if (currentWriter == null) {
+            return;
+        }
+
+        currentWriter.close();
+        // only store abort executor in memory
+        // cannot store whole writer, it includes lots of memory for example column vectors to read
+        // and write
+        closedWriters.add(currentWriter.abortExecutor());
+        results.add(currentWriter.result());
+        currentWriter = null;
+    }
+
+    @Override
+    public long recordCount() {
+        return recordCount;
+    }
+
+    @Override
+    public void abort() {
+        if (currentWriter != null) {
+            currentWriter.abort();
+        }
+        for (AbortExecutor abortExecutor : closedWriters) {
+            abortExecutor.abort();
+        }
+    }
+
+    @Override
+    public List<R> result() {
+        Preconditions.checkState(closed, "Cannot access the results unless close all writers.");
+        return results;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (closed) {
+            return;
+        }
+
+        try {
+            closeCurrentWriter();
+        } catch (IOException e) {
+            LOG.warn(
+                    "Exception occurs when writing file " + currentWriter.path() + ". Cleaning up.",
+                    e);
+            abort();
+            throw e;
+        } finally {
+            closed = true;
+        }
+    }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
index 04b69d3..c309a10 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
@@ -36,8 +36,8 @@
 import java.util.Arrays;
 import java.util.List;
 
-/** {@link RollingFileWriter} for data files containing {@link InternalRow}. */
-public class RowDataRollingFileWriter extends RollingFileWriter<InternalRow, DataFileMeta> {
+/** {@link RollingFileWriterImpl} for data files containing {@link InternalRow}. */
+public class RowDataRollingFileWriter extends RollingFileWriterImpl<InternalRow, DataFileMeta> {
 
     public RowDataRollingFileWriter(
             FileIO fileIO,
diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
index 38d8e0a..1a68b15 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
@@ -27,6 +27,7 @@
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.io.RollingFileWriter;
+import org.apache.paimon.io.RollingFileWriterImpl;
 import org.apache.paimon.io.SingleFileWriter;
 import org.apache.paimon.operation.metrics.CacheMetrics;
 import org.apache.paimon.partition.PartitionPredicate;
@@ -146,14 +147,14 @@
         try {
             writer.write(entries);
             writer.close();
+            return writer.result();
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
-        return writer.result();
     }
 
     public RollingFileWriter<ManifestEntry, ManifestFileMeta> createRollingWriter() {
-        return new RollingFileWriter<>(
+        return new RollingFileWriterImpl<>(
                 () -> new ManifestEntryWriter(writerFactory, pathFactory.newPath(), compression),
                 suggestedFileSize);
     }
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java b/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java
index 5d2e99e..62bfd0c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java
@@ -35,6 +35,16 @@
 /** The stats utils to create {@link SimpleColStatsCollector.Factory}s. */
 public class StatsCollectorFactories {
 
+    private final CoreOptions options;
+
+    public StatsCollectorFactories(CoreOptions options) {
+        this.options = options;
+    }
+
+    public SimpleColStatsCollector.Factory[] statsCollectors(List<String> fieldNames) {
+        return createStatsFactories(options.statsMode(), options, fieldNames);
+    }
+
     public static SimpleColStatsCollector.Factory[] createStatsFactories(
             String statsMode, CoreOptions options, List<String> fields) {
         return createStatsFactories(statsMode, options, fields, Collections.emptyList());
diff --git a/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
new file mode 100644
index 0000000..fef2e59
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.BlobData;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fileindex.FileIndexOptions;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.io.BundleRecords;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.LongCounter;
+import org.apache.paimon.utils.StatsCollectorFactories;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link RollingBlobFileWriter}. */
+public class RollingBlobFileWriterTest {
+
+    private static final RowType SCHEMA =
+            RowType.builder()
+                    .field("f0", DataTypes.INT())
+                    .field("f1", DataTypes.STRING())
+                    .field("f2", DataTypes.BLOB())
+                    .build();
+
+    private static final long TARGET_FILE_SIZE = 12 * 1024 * 1024L; // 12 MB
+    private static final long SCHEMA_ID = 1L;
+    private static final String COMPRESSION = "zstd";
+
+    @TempDir java.nio.file.Path tempDir;
+
+    private RollingBlobFileWriter writer;
+    private DataFilePathFactory pathFactory;
+    private LongCounter seqNumCounter;
+    private byte[] testBlobData;
+
+    @BeforeEach
+    public void setUp() throws IOException {
+        // Create test blob data
+        testBlobData = new byte[1024 * 1024]; // 1 MB
+        new Random(42).nextBytes(testBlobData);
+
+        // Setup file system and path factory
+        LocalFileIO fileIO = LocalFileIO.create();
+        pathFactory =
+                new DataFilePathFactory(
+                        new Path(tempDir + "/bucket-0"),
+                        "parquet",
+                        "data",
+                        "changelog",
+                        false,
+                        null,
+                        null);
+        seqNumCounter = new LongCounter();
+
+        // Initialize the writer
+        writer =
+                new RollingBlobFileWriter(
+                        fileIO,
+                        SCHEMA_ID,
+                        FileFormat.fromIdentifier("parquet", new Options()),
+                        TARGET_FILE_SIZE,
+                        SCHEMA,
+                        pathFactory,
+                        seqNumCounter,
+                        COMPRESSION,
+                        new StatsCollectorFactories(new CoreOptions(new Options())),
+                        new FileIndexOptions(),
+                        FileSource.APPEND,
+                        false, // asyncFileWrite
+                        false // statsDenseStore
+                        );
+    }
+
+    @Test
+    public void testBasicWriting() throws IOException {
+        // Write a single row with blob data
+        InternalRow row =
+                GenericRow.of(1, BinaryString.fromString("test"), new BlobData(testBlobData));
+        writer.write(row);
+
+        assertThat(writer.recordCount()).isEqualTo(1);
+    }
+
+    @Test
+    public void testMultipleWrites() throws IOException {
+        // Write multiple rows
+        for (int i = 0; i < 36; i++) {
+            InternalRow row =
+                    GenericRow.of(
+                            i, BinaryString.fromString("test" + i), new BlobData(testBlobData));
+            writer.write(row);
+        }
+
+        writer.close();
+        List<DataFileMeta> metasResult = writer.result();
+
+        assertThat(metasResult.size()).isEqualTo(4);
+        assertThat(metasResult.get(0).fileFormat()).isEqualTo("parquet");
+        assertThat(metasResult.subList(1, 4)).allMatch(f -> f.fileFormat().equals("blob"));
+        assertThat(writer.recordCount()).isEqualTo(36);
+
+        assertThat(metasResult.get(0).rowCount())
+                .isEqualTo(
+                        metasResult.subList(1, 4).stream().mapToLong(DataFileMeta::rowCount).sum());
+    }
+
+    @Test
+    public void testBundleWriting() throws IOException {
+        // Create a bundle of records
+        List<InternalRow> rows =
+                Arrays.asList(
+                        GenericRow.of(
+                                1, BinaryString.fromString("test1"), new BlobData(testBlobData)),
+                        GenericRow.of(
+                                2, BinaryString.fromString("test2"), new BlobData(testBlobData)),
+                        GenericRow.of(
+                                3, BinaryString.fromString("test3"), new BlobData(testBlobData)));
+
+        // Write bundle
+        writer.writeBundle(new TestBundleRecords(rows));
+
+        assertThat(writer.recordCount()).isEqualTo(3);
+    }
+
+    @Test
+    public void testDoubleClose() throws IOException {
+        // Write some data
+        InternalRow row =
+                GenericRow.of(1, BinaryString.fromString("test"), new BlobData(testBlobData));
+        writer.write(row);
+
+        // Close twice - should not throw exception
+        writer.close();
+        writer.close();
+
+        // Should be able to get results
+        List<DataFileMeta> results = writer.result();
+        assertThat(results).isNotEmpty();
+    }
+
+    @Test
+    public void testSchemaValidation() throws IOException {
+        // Test that the writer correctly handles the schema with blob field
+        InternalRow row =
+                GenericRow.of(1, BinaryString.fromString("test"), new BlobData(testBlobData));
+        writer.write(row);
+        writer.close();
+
+        List<DataFileMeta> results = writer.result();
+
+        // Verify schema ID is set correctly
+        results.forEach(file -> assertThat(file.schemaId()).isEqualTo(SCHEMA_ID));
+    }
+
+    /** Simple implementation of BundleRecords for testing. */
+    private static class TestBundleRecords implements BundleRecords {
+        private final List<InternalRow> rows;
+
+        public TestBundleRecords(List<InternalRow> rows) {
+            this.rows = rows;
+        }
+
+        @Override
+        public java.util.Iterator<InternalRow> iterator() {
+            return rows.iterator();
+        }
+
+        @Override
+        public long rowCount() {
+            return rows.size();
+        }
+    }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index 02f096f..97d8638 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -101,8 +101,9 @@
         KeyValueFileWriterFactory writerFactory = createWriterFactory(tempDir.toString(), format);
         DataFileMetaSerializer serializer = new DataFileMetaSerializer();
 
-        RollingFileWriter<KeyValue, DataFileMeta> writer =
-                writerFactory.createRollingMergeTreeFileWriter(0, FileSource.APPEND);
+        RollingFileWriterImpl<KeyValue, DataFileMeta> writer =
+                (RollingFileWriterImpl<KeyValue, DataFileMeta>)
+                        writerFactory.createRollingMergeTreeFileWriter(0, FileSource.APPEND);
         writer.write(CloseableIterator.fromList(data.content, kv -> {}));
         writer.close();
         List<DataFileMeta> actualMetas = writer.result();
diff --git a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
index 8e82a41..8ac16aa 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
@@ -44,7 +44,7 @@
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Test for {@link RollingFileWriter}. */
+/** Test for {@link RollingFileWriterImpl}. */
 public class RollingFileWriterTest {
 
     private static final RowType SCHEMA =
@@ -58,7 +58,7 @@
 
     @TempDir java.nio.file.Path tempDir;
 
-    private RollingFileWriter<InternalRow, DataFileMeta> rollingFileWriter;
+    private RollingFileWriterImpl<InternalRow, DataFileMeta> rollingFileWriter;
 
     public void initialize(String identifier) {
         initialize(identifier, false);
@@ -67,7 +67,7 @@
     public void initialize(String identifier, boolean statsDenseStore) {
         FileFormat fileFormat = FileFormat.fromIdentifier(identifier, new Options());
         rollingFileWriter =
-                new RollingFileWriter<>(
+                new RollingFileWriterImpl<>(
                         () ->
                                 new RowDataFileWriter(
                                         LocalFileIO.create(),