blob: 65107856eee7acd6bad175a0451fb90c3fc0e4e0 [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import logging
import uuid
from datetime import datetime
from typing import List, Optional, Tuple, Dict
import pyarrow as pa
from pypaimon.common.core_options import CoreOptions
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.manifest.schema.simple_stats import SimpleStats
from pypaimon.table.row.generic_row import GenericRow
from pypaimon.write.writer.data_writer import DataWriter
logger = logging.getLogger(__name__)
class DataBlobWriter(DataWriter):
"""
A rolling file writer that handles both normal data and blob data. This writer creates separate
files for normal columns and blob columns, managing their lifecycle independently.
For example, given a table schema with normal columns (id INT, name STRING) and a blob column
(data BLOB), this writer will create separate files for (id, name) and (data).
Key features:
- Blob data can roll independently when normal data doesn't need rolling
- When normal data rolls, blob data MUST also be closed (Java behavior)
- Blob data uses more aggressive rolling (smaller target size) to prevent memory issues
- One normal data file may correspond to multiple blob data files
- Blob data is written immediately to disk to prevent memory corruption
- Blob file metadata is stored as separate DataFileMeta objects after normal file metadata
Rolling behavior:
- Normal data rolls: Both normal and blob writers are closed together, blob metadata added after normal metadata
- Blob data rolls independently: Only blob writer is closed, blob metadata is cached until normal data rolls
Metadata organization:
- Normal file metadata is added first to committed_files
- Blob file metadata is added after normal file metadata in committed_files
- When blob rolls independently, metadata is cached until normal data rolls
- Result: [normal_meta, blob_meta1, blob_meta2, blob_meta3, ...]
Example file organization:
committed_files = [
normal_file1_meta, # f1.parquet metadata
blob_file1_meta, # b1.blob metadata
blob_file2_meta, # b2.blob metadata
blob_file3_meta, # b3.blob metadata
normal_file2_meta, # f1-2.parquet metadata
blob_file4_meta, # b4.blob metadata
blob_file5_meta, # b5.blob metadata
]
This matches the Java RollingBlobFileWriter behavior exactly.
"""
# Constant for checking rolling condition periodically
CHECK_ROLLING_RECORD_CNT = 1000
def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: int, options: Dict[str, str] = None):
super().__init__(table, partition, bucket, max_seq_number, options)
# Determine blob column from table schema
self.blob_column_name = self._get_blob_columns_from_schema()
# Split schema into normal and blob columns
all_column_names = self.table.field_names
self.normal_column_names = [col for col in all_column_names if col != self.blob_column_name]
self.write_cols = self.normal_column_names
# State management for blob writer
self.record_count = 0
self.closed = False
# Track pending data for normal data only
self.pending_normal_data: Optional[pa.Table] = None
# Initialize blob writer with blob column name
from pypaimon.write.writer.blob_writer import BlobWriter
self.blob_writer = BlobWriter(
table=self.table,
partition=self.partition,
bucket=self.bucket,
max_seq_number=max_seq_number,
blob_column=self.blob_column_name,
options=options
)
logger.info(f"Initialized DataBlobWriter with blob column: {self.blob_column_name}")
def _get_blob_columns_from_schema(self) -> str:
blob_columns = []
for field in self.table.table_schema.fields:
type_str = str(field.type).lower()
if 'blob' in type_str:
blob_columns.append(field.name)
# Validate blob column count (matching Java constraint)
if len(blob_columns) == 0:
raise ValueError("No blob field found in table schema.")
elif len(blob_columns) > 1:
raise ValueError("Limit exactly one blob field in one paimon table yet.")
return blob_columns[0] # Return single blob column name
def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch:
normal_data, _ = self._split_data(data)
return normal_data
def _merge_data(self, existing_data: pa.Table, new_data: pa.Table) -> pa.Table:
return self._merge_normal_data(existing_data, new_data)
def write(self, data: pa.RecordBatch):
try:
# Split data into normal and blob parts
normal_data, blob_data = self._split_data(data)
# Process and accumulate normal data
processed_normal = self._process_normal_data(normal_data)
if self.pending_normal_data is None:
self.pending_normal_data = processed_normal
else:
self.pending_normal_data = self._merge_normal_data(self.pending_normal_data, processed_normal)
# Write blob data directly to blob writer (handles its own rolling)
if blob_data is not None and blob_data.num_rows > 0:
# Write blob data directly to blob writer
self.blob_writer.write(blob_data)
self.record_count += data.num_rows
# Check if normal data rolling is needed
if self._should_roll_normal():
# When normal data rolls, close both writers and fetch blob metadata
self._close_current_writers()
except Exception as e:
logger.error("Exception occurs when writing data. Cleaning up.", exc_info=e)
self.abort()
raise e
def prepare_commit(self) -> List[DataFileMeta]:
# Close any remaining data
self._close_current_writers()
return self.committed_files.copy()
def close(self):
if self.closed:
return
try:
if self.pending_normal_data is not None and self.pending_normal_data.num_rows > 0:
self._close_current_writers()
except Exception as e:
logger.error("Exception occurs when closing writer. Cleaning up.", exc_info=e)
self.abort()
finally:
self.closed = True
self.pending_normal_data = None
def abort(self):
"""Abort all writers and clean up resources."""
self.blob_writer.abort()
self.pending_normal_data = None
self.committed_files.clear()
def _split_data(self, data: pa.RecordBatch) -> Tuple[pa.RecordBatch, pa.RecordBatch]:
"""Split data into normal and blob parts based on column names."""
# Use the pre-computed column names
normal_columns = self.normal_column_names
blob_columns = [self.blob_column_name] # Single blob column
# Create projected batches
normal_data = data.select(normal_columns) if normal_columns else None
blob_data = data.select(blob_columns) if blob_columns else None
return normal_data, blob_data
def _process_normal_data(self, data: pa.RecordBatch) -> pa.Table:
"""Process normal data (similar to base DataWriter)."""
if data is None or data.num_rows == 0:
return pa.Table.from_batches([])
return pa.Table.from_batches([data])
def _merge_normal_data(self, existing_data: pa.Table, new_data: pa.Table) -> pa.Table:
return pa.concat_tables([existing_data, new_data])
def _should_roll_normal(self) -> bool:
if self.pending_normal_data is None:
return False
# Check rolling condition periodically (every CHECK_ROLLING_RECORD_CNT records)
if self.record_count % self.CHECK_ROLLING_RECORD_CNT != 0:
return False
# Check if normal data exceeds target size
current_size = self.pending_normal_data.nbytes
return current_size > self.target_file_size
def _close_current_writers(self):
"""Close both normal and blob writers and add blob metadata after normal metadata (Java behavior)."""
if self.pending_normal_data is None or self.pending_normal_data.num_rows == 0:
return
# Close normal writer and get metadata
normal_meta = self._write_normal_data_to_file(self.pending_normal_data)
# Fetch blob metadata from blob writer
blob_metas = self.blob_writer.prepare_commit()
# Validate consistency between normal and blob files (Java behavior)
self._validate_consistency(normal_meta, blob_metas)
# Add normal file metadata first
self.committed_files.append(normal_meta)
# Add blob file metadata after normal metadata
self.committed_files.extend(blob_metas)
# Reset pending data
self.pending_normal_data = None
logger.info(f"Closed both writers - normal: {normal_meta.file_name}, "
f"added {len(blob_metas)} blob file metadata after normal metadata")
def _write_normal_data_to_file(self, data: pa.Table) -> DataFileMeta:
if data.num_rows == 0:
return None
file_name = f"{CoreOptions.data_file_prefix(self.options)}{uuid.uuid4()}-0.{self.file_format}"
file_path = self._generate_file_path(file_name)
# Write file based on format
if self.file_format == CoreOptions.FILE_FORMAT_PARQUET:
self.file_io.write_parquet(file_path, data, compression=self.compression)
elif self.file_format == CoreOptions.FILE_FORMAT_ORC:
self.file_io.write_orc(file_path, data, compression=self.compression)
elif self.file_format == CoreOptions.FILE_FORMAT_AVRO:
self.file_io.write_avro(file_path, data)
elif self.file_format == CoreOptions.FILE_FORMAT_LANCE:
self.file_io.write_lance(file_path, data)
else:
raise ValueError(f"Unsupported file format: {self.file_format}")
# Determine if this is an external path
is_external_path = self.external_path_provider is not None
external_path_str = file_path if is_external_path else None
return self._create_data_file_meta(file_name, file_path, data, external_path_str)
def _create_data_file_meta(self, file_name: str, file_path: str, data: pa.Table,
external_path: Optional[str] = None) -> DataFileMeta:
# Column stats (only for normal columns)
column_stats = {
field.name: self._get_column_stats(data, field.name)
for field in self.table.table_schema.fields
if field.name != self.blob_column_name
}
# Get normal fields only
normal_fields = [field for field in self.table.table_schema.fields
if field.name != self.blob_column_name]
min_value_stats = [column_stats[field.name]['min_values'] for field in normal_fields]
max_value_stats = [column_stats[field.name]['max_values'] for field in normal_fields]
value_null_counts = [column_stats[field.name]['null_counts'] for field in normal_fields]
self.sequence_generator.start = self.sequence_generator.current
return DataFileMeta(
file_name=file_name,
file_size=self.file_io.get_file_size(file_path),
row_count=data.num_rows,
min_key=GenericRow([], []),
max_key=GenericRow([], []),
key_stats=SimpleStats(
GenericRow([], []),
GenericRow([], []),
[]),
value_stats=SimpleStats(
GenericRow(min_value_stats, normal_fields),
GenericRow(max_value_stats, normal_fields),
value_null_counts),
min_sequence_number=-1,
max_sequence_number=-1,
schema_id=self.table.table_schema.id,
level=0,
extra_files=[],
creation_time=datetime.now(),
delete_row_count=0,
file_source=0,
value_stats_cols=self.normal_column_names,
external_path=external_path,
file_path=file_path,
write_cols=self.write_cols)
def _validate_consistency(self, normal_meta: DataFileMeta, blob_metas: List[DataFileMeta]):
if normal_meta is None:
return
normal_row_count = normal_meta.row_count
blob_row_count = sum(meta.row_count for meta in blob_metas)
if normal_row_count != blob_row_count:
raise RuntimeError(
f"This is a bug: The row count of main file and blob files does not match. "
f"Main file: {normal_meta.file_name} (row count: {normal_row_count}), "
f"blob files: {[meta.file_name for meta in blob_metas]} (total row count: {blob_row_count})"
)