[core] Support writing blob file for rolling. (#6340)
diff --git a/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java b/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java
index 04be878..7f3fa01 100644
--- a/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java
+++ b/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java
@@ -19,6 +19,10 @@
package org.apache.paimon.types;
import org.apache.paimon.annotation.Public;
+import org.apache.paimon.utils.Pair;
+
+import java.util.ArrayList;
+import java.util.List;
/**
* Data type of binary large object.
@@ -61,4 +65,21 @@
public <R> R accept(DataTypeVisitor<R> visitor) {
return visitor.visit(this);
}
+
+ public static Pair<RowType, RowType> splitBlob(RowType rowType) {
+ List<DataField> fields = rowType.getFields();
+ List<DataField> normalFields = new ArrayList<>();
+ List<DataField> blobFields = new ArrayList<>();
+
+ for (DataField field : fields) {
+ DataTypeRoot type = field.type().getTypeRoot();
+ if (type == DataTypeRoot.BLOB) {
+ blobFields.add(field);
+ } else {
+ normalFields.add(field);
+ }
+ }
+
+ return Pair.of(new RowType(normalFields), new RowType(blobFields));
+ }
}
diff --git a/paimon-api/src/main/java/org/apache/paimon/types/RowType.java b/paimon-api/src/main/java/org/apache/paimon/types/RowType.java
index 014f85a..e592ff7 100644
--- a/paimon-api/src/main/java/org/apache/paimon/types/RowType.java
+++ b/paimon-api/src/main/java/org/apache/paimon/types/RowType.java
@@ -286,13 +286,6 @@
}
}
- public RowType appendDataField(String name, DataType type) {
- List<DataField> newFields = new ArrayList<>(fields);
- int newId = currentHighestFieldId(fields) + 1;
- newFields.add(new DataField(newId, name, type));
- return new RowType(newFields);
- }
-
public RowType project(int[] mapping) {
List<DataField> fields = getFields();
return new RowType(
diff --git a/paimon-core/src/main/java/org/apache/paimon/append/PeojectedFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/PeojectedFileWriter.java
new file mode 100644
index 0000000..db9140d
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/append/PeojectedFileWriter.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.io.FileWriter;
+import org.apache.paimon.utils.ProjectedRow;
+
+import java.io.IOException;
+
+/**
+ * A delegating {@link FileWriter} which applies a field projection to each incoming {@link
+ * InternalRow} before forwarding it to the underlying writer.
+ *
+ * <p>This is useful when the physical file schema is a subset of the logical write schema. The
+ * projection is evaluated via {@link ProjectedRow} to avoid object allocations.
+ */
+public class PeojectedFileWriter<T extends FileWriter<InternalRow, R>, R>
+ implements FileWriter<InternalRow, R> {
+
+ private final T writer;
+ private final ProjectedRow projectedRow;
+
+ public PeojectedFileWriter(T writer, int[] projection) {
+ this.writer = writer;
+ this.projectedRow = ProjectedRow.from(projection);
+ }
+
+ @Override
+ public void write(InternalRow record) throws IOException {
+ projectedRow.replaceRow(record);
+ writer.write(projectedRow);
+ }
+
+ @Override
+ public long recordCount() {
+ return writer.recordCount();
+ }
+
+ @Override
+ public void abort() {
+ writer.abort();
+ }
+
+ @Override
+ public R result() throws IOException {
+ return writer.result();
+ }
+
+ @Override
+ public void close() throws IOException {
+ writer.close();
+ }
+
+ public T writer() {
+ return writer;
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
new file mode 100644
index 0000000..3464465
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fileindex.FileIndexOptions;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.blob.BlobFileFormat;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.io.BundleRecords;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.io.RollingFileWriter;
+import org.apache.paimon.io.RollingFileWriterImpl;
+import org.apache.paimon.io.RowDataFileWriter;
+import org.apache.paimon.io.SingleFileWriter;
+import org.apache.paimon.io.SingleFileWriter.AbortExecutor;
+import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.statistics.NoneSimpleColStatsCollector;
+import org.apache.paimon.statistics.SimpleColStatsCollector;
+import org.apache.paimon.types.BlobType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.LongCounter;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.StatsCollectorFactories;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * A rolling file writer that handles both normal data and blob data. This writer creates separate
+ * files for normal columns and blob columns, managing their lifecycle and ensuring consistency
+ * between them.
+ *
+ * <pre>
+ * For example,
+ * given a table schema with normal columns (id INT, name STRING) and a blob column (data BLOB),
+ * this writer will create separate files for (id, name) and (data).
+ * It will roll files based on the specified target file size, ensuring that both normal and blob
+ * files are rolled simultaneously.
+ *
+ * Every time a file is rolled, the writer will close the current normal data file and blob data files,
+ * so one normal data file may correspond to multiple blob data files.
+ *
+ * Normal file1: f1.parquet may including (b1.blob, b2.blob, b3.blob)
+ * Normal file2: f1-2.parquet may including (b4.blob, b5.blob)
+ *
+ * </pre>
+ */
+public class RollingBlobFileWriter implements RollingFileWriter<InternalRow, DataFileMeta> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RollingBlobFileWriter.class);
+
+ /** Constant for checking rolling condition periodically. */
+ private static final long CHECK_ROLLING_RECORD_CNT = 1000L;
+
+ /** Expected number of blob fields in a table. */
+ private static final int EXPECTED_BLOB_FIELD_COUNT = 1;
+
+ // Core components
+ private final Supplier<
+ PeojectedFileWriter<SingleFileWriter<InternalRow, DataFileMeta>, DataFileMeta>>
+ writerFactory;
+ private final PeojectedFileWriter<
+ RollingFileWriterImpl<InternalRow, DataFileMeta>, List<DataFileMeta>>
+ blobWriter;
+ private final long targetFileSize;
+
+ // State management
+ private final List<AbortExecutor> closedWriters;
+ private final List<DataFileMeta> results;
+ private PeojectedFileWriter<SingleFileWriter<InternalRow, DataFileMeta>, DataFileMeta>
+ currentWriter;
+ private long recordCount = 0;
+ private boolean closed = false;
+
+ public RollingBlobFileWriter(
+ FileIO fileIO,
+ long schemaId,
+ FileFormat fileFormat,
+ long targetFileSize,
+ RowType writeSchema,
+ DataFilePathFactory pathFactory,
+ LongCounter seqNumCounter,
+ String fileCompression,
+ StatsCollectorFactories statsCollectorFactories,
+ FileIndexOptions fileIndexOptions,
+ FileSource fileSource,
+ boolean asyncFileWrite,
+ boolean statsDenseStore) {
+
+ // Initialize basic fields
+ this.targetFileSize = targetFileSize;
+ this.results = new ArrayList<>();
+ this.closedWriters = new ArrayList<>();
+
+ // Split schema into normal and blob parts
+ Pair<RowType, RowType> typeWithBlob = BlobType.splitBlob(writeSchema);
+ RowType normalRowType = typeWithBlob.getLeft();
+ RowType blobType = typeWithBlob.getRight();
+
+ // Initialize writer factory for normal data
+ this.writerFactory =
+ createNormalWriterFactory(
+ fileIO,
+ schemaId,
+ fileFormat,
+ normalRowType,
+ writeSchema,
+ pathFactory,
+ seqNumCounter,
+ fileCompression,
+ statsCollectorFactories,
+ fileIndexOptions,
+ fileSource,
+ asyncFileWrite,
+ statsDenseStore);
+
+ // Initialize blob writer
+ this.blobWriter =
+ createBlobWriter(
+ fileIO,
+ schemaId,
+ blobType,
+ writeSchema,
+ pathFactory,
+ seqNumCounter,
+ fileSource,
+ asyncFileWrite,
+ statsDenseStore,
+ targetFileSize);
+ }
+
+ /** Creates a factory for normal data writers. */
+ private Supplier<PeojectedFileWriter<SingleFileWriter<InternalRow, DataFileMeta>, DataFileMeta>>
+ createNormalWriterFactory(
+ FileIO fileIO,
+ long schemaId,
+ FileFormat fileFormat,
+ RowType normalRowType,
+ RowType writeSchema,
+ DataFilePathFactory pathFactory,
+ LongCounter seqNumCounter,
+ String fileCompression,
+ StatsCollectorFactories statsCollectorFactories,
+ FileIndexOptions fileIndexOptions,
+ FileSource fileSource,
+ boolean asyncFileWrite,
+ boolean statsDenseStore) {
+
+ List<String> normalColumnNames = normalRowType.getFieldNames();
+ int[] projectionNormalFields = writeSchema.projectIndexes(normalColumnNames);
+
+ return () -> {
+ RowDataFileWriter rowDataFileWriter =
+ new RowDataFileWriter(
+ fileIO,
+ RollingFileWriter.createFileWriterContext(
+ fileFormat,
+ normalRowType,
+ statsCollectorFactories.statsCollectors(normalColumnNames),
+ fileCompression),
+ pathFactory.newPath(),
+ normalRowType,
+ schemaId,
+ seqNumCounter,
+ fileIndexOptions,
+ fileSource,
+ asyncFileWrite,
+ statsDenseStore,
+ pathFactory.isExternalPath(),
+ normalColumnNames);
+ return new PeojectedFileWriter<>(rowDataFileWriter, projectionNormalFields);
+ };
+ }
+
+ /** Creates a blob writer for handling blob data. */
+ private PeojectedFileWriter<
+ RollingFileWriterImpl<InternalRow, DataFileMeta>, List<DataFileMeta>>
+ createBlobWriter(
+ FileIO fileIO,
+ long schemaId,
+ RowType blobType,
+ RowType writeSchema,
+ DataFilePathFactory pathFactory,
+ LongCounter seqNumCounter,
+ FileSource fileSource,
+ boolean asyncFileWrite,
+ boolean statsDenseStore,
+ long targetFileSize) {
+
+ BlobFileFormat blobFileFormat = new BlobFileFormat();
+ List<String> blobNames = blobType.getFieldNames();
+
+ // Validate blob field count
+ checkArgument(
+ blobNames.size() == EXPECTED_BLOB_FIELD_COUNT,
+ "Limit exactly one blob fields in one paimon table yet.");
+
+ int[] blobProjection = writeSchema.projectIndexes(blobNames);
+ return new PeojectedFileWriter<>(
+ new RollingFileWriterImpl<>(
+ () ->
+ new RowDataFileWriter(
+ fileIO,
+ RollingFileWriter.createFileWriterContext(
+ blobFileFormat,
+ blobType,
+ new SimpleColStatsCollector.Factory[] {
+ NoneSimpleColStatsCollector::new
+ },
+ "none"),
+ pathFactory.newBlobPath(),
+ blobType,
+ schemaId,
+ seqNumCounter,
+ new FileIndexOptions(),
+ fileSource,
+ asyncFileWrite,
+ statsDenseStore,
+ pathFactory.isExternalPath(),
+ blobNames),
+ targetFileSize),
+ blobProjection);
+ }
+
+ /**
+ * Writes a single row to both normal and blob writers. Automatically handles file rolling when
+ * target size is reached.
+ *
+ * @param row The row to write
+ * @throws IOException if writing fails
+ */
+ @Override
+ public void write(InternalRow row) throws IOException {
+ try {
+ if (currentWriter == null) {
+ currentWriter = writerFactory.get();
+ }
+ currentWriter.write(row);
+ blobWriter.write(row);
+ recordCount++;
+
+ if (rollingFile()) {
+ closeCurrentWriter();
+ }
+ } catch (Throwable e) {
+ handleWriteException(e);
+ throw e;
+ }
+ }
+
+ /** Handles write exceptions by logging and cleaning up resources. */
+ private void handleWriteException(Throwable e) {
+ String filePath = (currentWriter == null) ? null : currentWriter.writer().path().toString();
+ LOG.warn("Exception occurs when writing file {}. Cleaning up.", filePath, e);
+ abort();
+ }
+
+ /**
+ * Writes a bundle of records by iterating through each row.
+ *
+ * @param bundle The bundle of records to write
+ * @throws IOException if writing fails
+ */
+ @Override
+ public void writeBundle(BundleRecords bundle) throws IOException {
+ // TODO: support bundle projection
+ for (InternalRow row : bundle) {
+ write(row);
+ }
+ }
+
+ /**
+ * Returns the total number of records written.
+ *
+ * @return the record count
+ */
+ @Override
+ public long recordCount() {
+ return recordCount;
+ }
+
+ /**
+ * Aborts all writers and cleans up resources. This method should be called when an error occurs
+ * during writing.
+ */
+ @Override
+ public void abort() {
+ if (currentWriter != null) {
+ currentWriter.abort();
+ currentWriter = null;
+ }
+ for (AbortExecutor abortExecutor : closedWriters) {
+ abortExecutor.abort();
+ }
+ blobWriter.abort();
+ }
+
+ /** Checks if the current file should be rolled based on size and record count. */
+ private boolean rollingFile() throws IOException {
+ return currentWriter
+ .writer()
+ .reachTargetSize(recordCount % CHECK_ROLLING_RECORD_CNT == 0, targetFileSize);
+ }
+
+ /**
+ * Closes the current writer and processes the results. Validates consistency between main and
+ * blob files.
+ *
+ * @throws IOException if closing fails
+ */
+ private void closeCurrentWriter() throws IOException {
+ if (currentWriter == null) {
+ return;
+ }
+
+ // Close main writer and get metadata
+ DataFileMeta mainDataFileMeta = closeMainWriter();
+
+ // Close blob writer and process blob metadata
+ List<DataFileMeta> blobMetas = closeBlobWriter();
+
+ // Validate consistency between main and blob files
+ validateFileConsistency(mainDataFileMeta, blobMetas);
+
+ // Add results to the results list
+ results.add(mainDataFileMeta);
+ results.addAll(blobMetas);
+
+ // Reset current writer
+ currentWriter = null;
+ }
+
+ /** Closes the main writer and returns its metadata. */
+ private DataFileMeta closeMainWriter() throws IOException {
+ currentWriter.close();
+ closedWriters.add(currentWriter.writer().abortExecutor());
+ return currentWriter.result();
+ }
+
+ /** Closes the blob writer and processes blob metadata with appropriate tags. */
+ private List<DataFileMeta> closeBlobWriter() throws IOException {
+ blobWriter.close();
+ return blobWriter.result();
+ }
+
+ /** Validates that the row counts match between main and blob files. */
+ private void validateFileConsistency(
+ DataFileMeta mainDataFileMeta, List<DataFileMeta> blobTaggedMetas) {
+ long mainRowCount = mainDataFileMeta.rowCount();
+ long blobRowCount = blobTaggedMetas.stream().mapToLong(DataFileMeta::rowCount).sum();
+
+ if (mainRowCount != blobRowCount) {
+ throw new IllegalStateException(
+ String.format(
+ "This is a bug: The row count of main file and blob files does not match. "
+ + "Main file: %s (row count: %d), blob files: %s (total row count: %d)",
+ mainDataFileMeta, mainRowCount, blobTaggedMetas, blobRowCount));
+ }
+ }
+
+ /**
+ * Returns the list of file metadata for all written files. This method can only be called after
+ * the writer has been closed.
+ *
+ * @return list of file metadata
+ * @throws IllegalStateException if the writer is not closed
+ */
+ @Override
+ public List<DataFileMeta> result() {
+ Preconditions.checkState(closed, "Cannot access the results unless close all writers.");
+ return results;
+ }
+
+ /**
+ * Closes the writer and finalizes all files. This method ensures proper cleanup and validation
+ * of written data.
+ *
+ * @throws IOException if closing fails
+ */
+ @Override
+ public void close() throws IOException {
+ if (closed) {
+ return;
+ }
+
+ try {
+ closeCurrentWriter();
+ } catch (IOException e) {
+ handleCloseException(e);
+ throw e;
+ } finally {
+ closed = true;
+ }
+ }
+
+ /** Handles exceptions that occur during closing. */
+ private void handleCloseException(IOException e) {
+ String filePath = (currentWriter == null) ? null : currentWriter.writer().path().toString();
+ LOG.warn("Exception occurs when writing file {}. Cleaning up.", filePath, e);
+ abort();
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
index bc5c577..2e05b25 100644
--- a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
@@ -31,7 +31,7 @@
import org.apache.paimon.iceberg.IcebergPathFactory;
import org.apache.paimon.iceberg.manifest.IcebergManifestFileMeta.Content;
import org.apache.paimon.iceberg.metadata.IcebergPartitionSpec;
-import org.apache.paimon.io.RollingFileWriter;
+import org.apache.paimon.io.RollingFileWriterImpl;
import org.apache.paimon.io.SingleFileWriter;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.options.MemorySize;
@@ -170,8 +170,8 @@
public List<IcebergManifestFileMeta> rollingWrite(
Iterator<IcebergManifestEntry> entries, long sequenceNumber, Content content) {
- RollingFileWriter<IcebergManifestEntry, IcebergManifestFileMeta> writer =
- new RollingFileWriter<>(
+ RollingFileWriterImpl<IcebergManifestEntry, IcebergManifestFileMeta> writer =
+ new RollingFileWriterImpl<>(
() -> createWriter(sequenceNumber, content), targetFileSize.getBytes());
try {
writer.write(entries);
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
index 3baa9d2..e8ee221 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
@@ -82,6 +82,10 @@
return newPath(dataFilePrefix);
}
+ public Path newBlobPath() {
+ return newPathFromName(newFileName(dataFilePrefix, ".blob"));
+ }
+
public Path newChangelogPath() {
return newPath(changelogFilePrefix);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
index 7afbd84..613beda 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
@@ -97,7 +97,7 @@
public RollingFileWriter<KeyValue, DataFileMeta> createRollingMergeTreeFileWriter(
int level, FileSource fileSource) {
WriteFormatKey key = new WriteFormatKey(level, false);
- return new RollingFileWriter<>(
+ return new RollingFileWriterImpl<>(
() -> {
DataFilePathFactory pathFactory = formatContext.pathFactory(key);
return createDataFileWriter(
@@ -108,7 +108,7 @@
public RollingFileWriter<KeyValue, DataFileMeta> createRollingChangelogFileWriter(int level) {
WriteFormatKey key = new WriteFormatKey(level, true);
- return new RollingFileWriter<>(
+ return new RollingFileWriterImpl<>(
() -> {
DataFilePathFactory pathFactory = formatContext.pathFactory(key);
return createDataFileWriter(
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriter.java
index 29b9223..18846bc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriter.java
@@ -19,160 +19,58 @@
package org.apache.paimon.io;
import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.io.SingleFileWriter.AbortExecutor;
-import org.apache.paimon.utils.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.SimpleStatsCollector;
+import org.apache.paimon.format.avro.AvroFileFormat;
+import org.apache.paimon.statistics.NoneSimpleColStatsCollector;
+import org.apache.paimon.statistics.SimpleColStatsCollector;
+import org.apache.paimon.types.RowType;
import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
-import java.util.function.Supplier;
/**
- * Writer to roll over to a new file if the current size exceed the target file size.
+ * A file writer that automatically rolls over to a new file when a target size is reached and
+ * supports writing bundles of records.
*
- * @param <T> record data type.
- * @param <R> the file metadata result.
+ * <p>This interface also provides utilities to construct a {@link FileWriterContext} with
+ * appropriate statistics collection (collector-based for Avro, extractor-based for others), so
+ * implementations can focus on rolling logic and I/O.
*/
-public class RollingFileWriter<T, R> implements FileWriter<T, List<R>> {
+public interface RollingFileWriter<T, R> extends FileWriter<T, List<R>> {
- private static final Logger LOG = LoggerFactory.getLogger(RollingFileWriter.class);
+ int CHECK_ROLLING_RECORD_CNT = 1000;
- private static final int CHECK_ROLLING_RECORD_CNT = 1000;
-
- private final Supplier<? extends SingleFileWriter<T, R>> writerFactory;
- private final long targetFileSize;
- private final List<AbortExecutor> closedWriters;
- private final List<R> results;
-
- private SingleFileWriter<T, R> currentWriter = null;
- private long recordCount = 0;
- private boolean closed = false;
-
- public RollingFileWriter(
- Supplier<? extends SingleFileWriter<T, R>> writerFactory, long targetFileSize) {
- this.writerFactory = writerFactory;
- this.targetFileSize = targetFileSize;
- this.results = new ArrayList<>();
- this.closedWriters = new ArrayList<>();
- }
+ void writeBundle(BundleRecords records) throws IOException;
@VisibleForTesting
- public long targetFileSize() {
- return targetFileSize;
+ static FileWriterContext createFileWriterContext(
+ FileFormat fileFormat,
+ RowType rowType,
+ SimpleColStatsCollector.Factory[] statsCollectors,
+ String fileCompression) {
+ return new FileWriterContext(
+ fileFormat.createWriterFactory(rowType),
+ createStatsProducer(fileFormat, rowType, statsCollectors),
+ fileCompression);
}
- private boolean rollingFile(boolean forceCheck) throws IOException {
- return currentWriter.reachTargetSize(
- forceCheck || recordCount % CHECK_ROLLING_RECORD_CNT == 0, targetFileSize);
- }
-
- @Override
- public void write(T row) throws IOException {
- try {
- // Open the current writer if write the first record or roll over happen before.
- if (currentWriter == null) {
- openCurrentWriter();
- }
-
- currentWriter.write(row);
- recordCount += 1;
-
- if (rollingFile(false)) {
- closeCurrentWriter();
- }
- } catch (Throwable e) {
- LOG.warn(
- "Exception occurs when writing file "
- + (currentWriter == null ? null : currentWriter.path())
- + ". Cleaning up.",
- e);
- abort();
- throw e;
+ static SimpleStatsProducer createStatsProducer(
+ FileFormat fileFormat,
+ RowType rowType,
+ SimpleColStatsCollector.Factory[] statsCollectors) {
+ boolean isDisabled =
+ Arrays.stream(SimpleColStatsCollector.create(statsCollectors))
+ .allMatch(p -> p instanceof NoneSimpleColStatsCollector);
+ if (isDisabled) {
+ return SimpleStatsProducer.disabledProducer();
}
- }
-
- public void writeBundle(BundleRecords bundle) throws IOException {
- try {
- // Open the current writer if write the first record or roll over happen before.
- if (currentWriter == null) {
- openCurrentWriter();
- }
-
- currentWriter.writeBundle(bundle);
- recordCount += bundle.rowCount();
-
- if (rollingFile(true)) {
- closeCurrentWriter();
- }
- } catch (Throwable e) {
- LOG.warn(
- "Exception occurs when writing file "
- + (currentWriter == null ? null : currentWriter.path())
- + ". Cleaning up.",
- e);
- abort();
- throw e;
+ if (fileFormat instanceof AvroFileFormat) {
+ SimpleStatsCollector collector = new SimpleStatsCollector(rowType, statsCollectors);
+ return SimpleStatsProducer.fromCollector(collector);
}
- }
-
- private void openCurrentWriter() {
- currentWriter = writerFactory.get();
- }
-
- private void closeCurrentWriter() throws IOException {
- if (currentWriter == null) {
- return;
- }
-
- currentWriter.close();
- // only store abort executor in memory
- // cannot store whole writer, it includes lots of memory for example column vectors to read
- // and write
- closedWriters.add(currentWriter.abortExecutor());
- results.add(currentWriter.result());
- currentWriter = null;
- }
-
- @Override
- public long recordCount() {
- return recordCount;
- }
-
- @Override
- public void abort() {
- if (currentWriter != null) {
- currentWriter.abort();
- }
- for (AbortExecutor abortExecutor : closedWriters) {
- abortExecutor.abort();
- }
- }
-
- @Override
- public List<R> result() {
- Preconditions.checkState(closed, "Cannot access the results unless close all writers.");
- return results;
- }
-
- @Override
- public void close() throws IOException {
- if (closed) {
- return;
- }
-
- try {
- closeCurrentWriter();
- } catch (IOException e) {
- LOG.warn(
- "Exception occurs when writing file " + currentWriter.path() + ". Cleaning up.",
- e);
- abort();
- throw e;
- } finally {
- closed = true;
- }
+ return SimpleStatsProducer.fromExtractor(
+ fileFormat.createStatsExtractor(rowType, statsCollectors).orElse(null));
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriterImpl.java b/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriterImpl.java
new file mode 100644
index 0000000..76ee44b
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriterImpl.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.io.SingleFileWriter.AbortExecutor;
+import org.apache.paimon.utils.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+
+/**
+ * Writer to roll over to a new file if the current size exceed the target file size.
+ *
+ * @param <T> record data type.
+ * @param <R> the file metadata result.
+ */
+public class RollingFileWriterImpl<T, R> implements RollingFileWriter<T, R> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RollingFileWriterImpl.class);
+
+ private final Supplier<? extends SingleFileWriter<T, R>> writerFactory;
+ private final long targetFileSize;
+ private final List<AbortExecutor> closedWriters;
+ protected final List<R> results;
+
+ private SingleFileWriter<T, R> currentWriter = null;
+ private long recordCount = 0;
+ private boolean closed = false;
+
+ public RollingFileWriterImpl(
+ Supplier<? extends SingleFileWriter<T, R>> writerFactory, long targetFileSize) {
+ this.writerFactory = writerFactory;
+ this.targetFileSize = targetFileSize;
+ this.results = new ArrayList<>();
+ this.closedWriters = new ArrayList<>();
+ }
+
+ @VisibleForTesting
+ public long targetFileSize() {
+ return targetFileSize;
+ }
+
+ private boolean rollingFile(boolean forceCheck) throws IOException {
+ return currentWriter.reachTargetSize(
+ forceCheck || recordCount % CHECK_ROLLING_RECORD_CNT == 0, targetFileSize);
+ }
+
+ @Override
+ public void write(T row) throws IOException {
+ try {
+ // Open the current writer if write the first record or roll over happen before.
+ if (currentWriter == null) {
+ openCurrentWriter();
+ }
+
+ currentWriter.write(row);
+ recordCount += 1;
+
+ if (rollingFile(false)) {
+ closeCurrentWriter();
+ }
+ } catch (Throwable e) {
+ LOG.warn(
+ "Exception occurs when writing file "
+ + (currentWriter == null ? null : currentWriter.path())
+ + ". Cleaning up.",
+ e);
+ abort();
+ throw e;
+ }
+ }
+
+ @Override
+ public void writeBundle(BundleRecords bundle) throws IOException {
+ try {
+ // Open the current writer if write the first record or roll over happen before.
+ if (currentWriter == null) {
+ openCurrentWriter();
+ }
+
+ currentWriter.writeBundle(bundle);
+ recordCount += bundle.rowCount();
+
+ if (rollingFile(true)) {
+ closeCurrentWriter();
+ }
+ } catch (Throwable e) {
+ LOG.warn(
+ "Exception occurs when writing file "
+ + (currentWriter == null ? null : currentWriter.path())
+ + ". Cleaning up.",
+ e);
+ abort();
+ throw e;
+ }
+ }
+
+ private void openCurrentWriter() {
+ currentWriter = writerFactory.get();
+ }
+
+ protected void closeCurrentWriter() throws IOException {
+ if (currentWriter == null) {
+ return;
+ }
+
+ currentWriter.close();
+ // only store abort executor in memory
+ // cannot store whole writer, it includes lots of memory for example column vectors to read
+ // and write
+ closedWriters.add(currentWriter.abortExecutor());
+ results.add(currentWriter.result());
+ currentWriter = null;
+ }
+
+ @Override
+ public long recordCount() {
+ return recordCount;
+ }
+
+ @Override
+ public void abort() {
+ if (currentWriter != null) {
+ currentWriter.abort();
+ }
+ for (AbortExecutor abortExecutor : closedWriters) {
+ abortExecutor.abort();
+ }
+ }
+
+ @Override
+ public List<R> result() {
+ Preconditions.checkState(closed, "Cannot access the results unless close all writers.");
+ return results;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed) {
+ return;
+ }
+
+ try {
+ closeCurrentWriter();
+ } catch (IOException e) {
+ LOG.warn(
+ "Exception occurs when writing file " + currentWriter.path() + ". Cleaning up.",
+ e);
+ abort();
+ throw e;
+ } finally {
+ closed = true;
+ }
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
index 04b69d3..c309a10 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
@@ -36,8 +36,8 @@
import java.util.Arrays;
import java.util.List;
-/** {@link RollingFileWriter} for data files containing {@link InternalRow}. */
-public class RowDataRollingFileWriter extends RollingFileWriter<InternalRow, DataFileMeta> {
+/** {@link RollingFileWriterImpl} for data files containing {@link InternalRow}. */
+public class RowDataRollingFileWriter extends RollingFileWriterImpl<InternalRow, DataFileMeta> {
public RowDataRollingFileWriter(
FileIO fileIO,
diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
index 38d8e0a..1a68b15 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
@@ -27,6 +27,7 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.io.RollingFileWriter;
+import org.apache.paimon.io.RollingFileWriterImpl;
import org.apache.paimon.io.SingleFileWriter;
import org.apache.paimon.operation.metrics.CacheMetrics;
import org.apache.paimon.partition.PartitionPredicate;
@@ -146,14 +147,14 @@
try {
writer.write(entries);
writer.close();
+ return writer.result();
} catch (Exception e) {
throw new RuntimeException(e);
}
- return writer.result();
}
public RollingFileWriter<ManifestEntry, ManifestFileMeta> createRollingWriter() {
- return new RollingFileWriter<>(
+ return new RollingFileWriterImpl<>(
() -> new ManifestEntryWriter(writerFactory, pathFactory.newPath(), compression),
suggestedFileSize);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java b/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java
index 5d2e99e..62bfd0c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java
@@ -35,6 +35,16 @@
/** The stats utils to create {@link SimpleColStatsCollector.Factory}s. */
public class StatsCollectorFactories {
+ private final CoreOptions options;
+
+ public StatsCollectorFactories(CoreOptions options) {
+ this.options = options;
+ }
+
+ public SimpleColStatsCollector.Factory[] statsCollectors(List<String> fieldNames) {
+ return createStatsFactories(options.statsMode(), options, fieldNames);
+ }
+
public static SimpleColStatsCollector.Factory[] createStatsFactories(
String statsMode, CoreOptions options, List<String> fields) {
return createStatsFactories(statsMode, options, fields, Collections.emptyList());
diff --git a/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
new file mode 100644
index 0000000..fef2e59
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.BlobData;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fileindex.FileIndexOptions;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.io.BundleRecords;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.LongCounter;
+import org.apache.paimon.utils.StatsCollectorFactories;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link RollingBlobFileWriter}. */
+public class RollingBlobFileWriterTest {
+
+ private static final RowType SCHEMA =
+ RowType.builder()
+ .field("f0", DataTypes.INT())
+ .field("f1", DataTypes.STRING())
+ .field("f2", DataTypes.BLOB())
+ .build();
+
+ private static final long TARGET_FILE_SIZE = 12 * 1024 * 1024L; // 12 MB
+ private static final long SCHEMA_ID = 1L;
+ private static final String COMPRESSION = "zstd";
+
+ @TempDir java.nio.file.Path tempDir;
+
+ private RollingBlobFileWriter writer;
+ private DataFilePathFactory pathFactory;
+ private LongCounter seqNumCounter;
+ private byte[] testBlobData;
+
+ @BeforeEach
+ public void setUp() throws IOException {
+ // Create test blob data
+ testBlobData = new byte[1024 * 1024]; // 1 MB
+ new Random(42).nextBytes(testBlobData);
+
+ // Setup file system and path factory
+ LocalFileIO fileIO = LocalFileIO.create();
+ pathFactory =
+ new DataFilePathFactory(
+ new Path(tempDir + "/bucket-0"),
+ "parquet",
+ "data",
+ "changelog",
+ false,
+ null,
+ null);
+ seqNumCounter = new LongCounter();
+
+ // Initialize the writer
+ writer =
+ new RollingBlobFileWriter(
+ fileIO,
+ SCHEMA_ID,
+ FileFormat.fromIdentifier("parquet", new Options()),
+ TARGET_FILE_SIZE,
+ SCHEMA,
+ pathFactory,
+ seqNumCounter,
+ COMPRESSION,
+ new StatsCollectorFactories(new CoreOptions(new Options())),
+ new FileIndexOptions(),
+ FileSource.APPEND,
+ false, // asyncFileWrite
+ false // statsDenseStore
+ );
+ }
+
+ @Test
+ public void testBasicWriting() throws IOException {
+ // Write a single row with blob data
+ InternalRow row =
+ GenericRow.of(1, BinaryString.fromString("test"), new BlobData(testBlobData));
+ writer.write(row);
+
+ assertThat(writer.recordCount()).isEqualTo(1);
+ }
+
+ @Test
+ public void testMultipleWrites() throws IOException {
+ // Write multiple rows
+ for (int i = 0; i < 36; i++) {
+ InternalRow row =
+ GenericRow.of(
+ i, BinaryString.fromString("test" + i), new BlobData(testBlobData));
+ writer.write(row);
+ }
+
+ writer.close();
+ List<DataFileMeta> metasResult = writer.result();
+
+ assertThat(metasResult.size()).isEqualTo(4);
+ assertThat(metasResult.get(0).fileFormat()).isEqualTo("parquet");
+ assertThat(metasResult.subList(1, 4)).allMatch(f -> f.fileFormat().equals("blob"));
+ assertThat(writer.recordCount()).isEqualTo(36);
+
+ assertThat(metasResult.get(0).rowCount())
+ .isEqualTo(
+ metasResult.subList(1, 4).stream().mapToLong(DataFileMeta::rowCount).sum());
+ }
+
+ @Test
+ public void testBundleWriting() throws IOException {
+ // Create a bundle of records
+ List<InternalRow> rows =
+ Arrays.asList(
+ GenericRow.of(
+ 1, BinaryString.fromString("test1"), new BlobData(testBlobData)),
+ GenericRow.of(
+ 2, BinaryString.fromString("test2"), new BlobData(testBlobData)),
+ GenericRow.of(
+ 3, BinaryString.fromString("test3"), new BlobData(testBlobData)));
+
+ // Write bundle
+ writer.writeBundle(new TestBundleRecords(rows));
+
+ assertThat(writer.recordCount()).isEqualTo(3);
+ }
+
+ @Test
+ public void testDoubleClose() throws IOException {
+ // Write some data
+ InternalRow row =
+ GenericRow.of(1, BinaryString.fromString("test"), new BlobData(testBlobData));
+ writer.write(row);
+
+ // Close twice - should not throw exception
+ writer.close();
+ writer.close();
+
+ // Should be able to get results
+ List<DataFileMeta> results = writer.result();
+ assertThat(results).isNotEmpty();
+ }
+
+ @Test
+ public void testSchemaValidation() throws IOException {
+ // Test that the writer correctly handles the schema with blob field
+ InternalRow row =
+ GenericRow.of(1, BinaryString.fromString("test"), new BlobData(testBlobData));
+ writer.write(row);
+ writer.close();
+
+ List<DataFileMeta> results = writer.result();
+
+ // Verify schema ID is set correctly
+ results.forEach(file -> assertThat(file.schemaId()).isEqualTo(SCHEMA_ID));
+ }
+
+ /** Simple implementation of BundleRecords for testing. */
+ private static class TestBundleRecords implements BundleRecords {
+ private final List<InternalRow> rows;
+
+ public TestBundleRecords(List<InternalRow> rows) {
+ this.rows = rows;
+ }
+
+ @Override
+ public java.util.Iterator<InternalRow> iterator() {
+ return rows.iterator();
+ }
+
+ @Override
+ public long rowCount() {
+ return rows.size();
+ }
+ }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index 02f096f..97d8638 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -101,8 +101,9 @@
KeyValueFileWriterFactory writerFactory = createWriterFactory(tempDir.toString(), format);
DataFileMetaSerializer serializer = new DataFileMetaSerializer();
- RollingFileWriter<KeyValue, DataFileMeta> writer =
- writerFactory.createRollingMergeTreeFileWriter(0, FileSource.APPEND);
+ RollingFileWriterImpl<KeyValue, DataFileMeta> writer =
+ (RollingFileWriterImpl<KeyValue, DataFileMeta>)
+ writerFactory.createRollingMergeTreeFileWriter(0, FileSource.APPEND);
writer.write(CloseableIterator.fromList(data.content, kv -> {}));
writer.close();
List<DataFileMeta> actualMetas = writer.result();
diff --git a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
index 8e82a41..8ac16aa 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
@@ -44,7 +44,7 @@
import static org.assertj.core.api.Assertions.assertThat;
-/** Test for {@link RollingFileWriter}. */
+/** Test for {@link RollingFileWriterImpl}. */
public class RollingFileWriterTest {
private static final RowType SCHEMA =
@@ -58,7 +58,7 @@
@TempDir java.nio.file.Path tempDir;
- private RollingFileWriter<InternalRow, DataFileMeta> rollingFileWriter;
+ private RollingFileWriterImpl<InternalRow, DataFileMeta> rollingFileWriter;
public void initialize(String identifier) {
initialize(identifier, false);
@@ -67,7 +67,7 @@
public void initialize(String identifier, boolean statsDenseStore) {
FileFormat fileFormat = FileFormat.fromIdentifier(identifier, new Options());
rollingFileWriter =
- new RollingFileWriter<>(
+ new RollingFileWriterImpl<>(
() ->
new RowDataFileWriter(
LocalFileIO.create(),