blob: b7127ae86683ea3a28dc1cca809f137e6b38e758 [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Column comment directive utilities for BLOB / VECTOR type conversion.
Mirrors Java's ColumnDirectiveUtils. Supported directives:
__BLOB_FIELD -> blob-field
__BLOB_DESCRIPTOR_FIELD -> blob-descriptor-field
__BLOB_VIEW_FIELD -> blob-view-field
__BLOB_EXTERNAL_STORAGE_FIELD -> blob-external-storage-field + blob-descriptor-field
__VECTOR_FIELD;dim -> vector-field
"""
from dataclasses import dataclass
from typing import Dict, List, Optional
from pypaimon.common.options.core_options import CoreOptions
from pypaimon.schema.data_types import (ArrayType, AtomicType, DataField,
DataType, VectorType)
BLOB_FIELD_DIRECTIVE = "__BLOB_FIELD"
BLOB_DESCRIPTOR_FIELD_DIRECTIVE = "__BLOB_DESCRIPTOR_FIELD"
BLOB_VIEW_FIELD_DIRECTIVE = "__BLOB_VIEW_FIELD"
BLOB_EXTERNAL_STORAGE_FIELD_DIRECTIVE = "__BLOB_EXTERNAL_STORAGE_FIELD"
VECTOR_FIELD_DIRECTIVE = "__VECTOR_FIELD"
_BLOB_DIRECTIVES = [
(BLOB_EXTERNAL_STORAGE_FIELD_DIRECTIVE, CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key()),
(BLOB_VIEW_FIELD_DIRECTIVE, CoreOptions.BLOB_VIEW_FIELD.key()),
(BLOB_DESCRIPTOR_FIELD_DIRECTIVE, CoreOptions.BLOB_DESCRIPTOR_FIELD.key()),
(BLOB_FIELD_DIRECTIVE, CoreOptions.BLOB_FIELD.key()),
]
_BLOB_OPTIONS = [
CoreOptions.BLOB_FIELD,
CoreOptions.BLOB_DESCRIPTOR_FIELD,
CoreOptions.BLOB_VIEW_FIELD,
CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD,
]
_VECTOR_OPTIONS = [
CoreOptions.VECTOR_FIELD,
]
# Legacy fallback keys that map to canonical option keys.
# Java's blob-descriptor-field has fallback key "blob.stored-descriptor-fields".
_FALLBACK_KEYS = {
CoreOptions.BLOB_DESCRIPTOR_FIELD.key(): ["blob.stored-descriptor-fields"],
}
@dataclass
class ParsedDirective:
option_key: str
real_comment: Optional[str]
is_vector: bool
vector_dim: int
@dataclass
class ConvertedColumn:
type: DataType
comment: Optional[str]
def parse_add_column_comment(comment: Optional[str]) -> Optional[ParsedDirective]:
if comment is None:
return None
comment = comment.strip()
if not comment:
return None
if comment.startswith("__VECTOR"):
return _parse_vector_directive(comment)
if not comment.startswith("__BLOB"):
return None
for marker, option_key in _BLOB_DIRECTIVES:
if not comment.startswith(marker):
continue
if len(comment) == len(marker):
real_comment = None
elif comment[len(marker)] == ';':
real_comment = comment[len(marker) + 1:].strip() or None
else:
continue
return ParsedDirective(option_key, real_comment, False, 0)
raise ValueError(
f"Unsupported BLOB directive in column comment: '{comment}'. "
f"Supported directives are '{BLOB_FIELD_DIRECTIVE}', "
f"'{BLOB_DESCRIPTOR_FIELD_DIRECTIVE}', '{BLOB_VIEW_FIELD_DIRECTIVE}' "
f"and '{BLOB_EXTERNAL_STORAGE_FIELD_DIRECTIVE}'."
)
def _parse_vector_directive(comment: str) -> ParsedDirective:
marker = VECTOR_FIELD_DIRECTIVE
if not comment.startswith(marker):
raise ValueError(
f"Unsupported VECTOR directive in column comment: '{comment}'. "
f"Supported directive is '{VECTOR_FIELD_DIRECTIVE}'."
)
if len(comment) == len(marker) or comment[len(marker)] != ';':
raise ValueError(
f"VECTOR directive '{comment}' requires a dimension, "
f"e.g. '{VECTOR_FIELD_DIRECTIVE};128' or "
f"'{VECTOR_FIELD_DIRECTIVE};128; my comment'."
)
rest = comment[len(marker) + 1:]
semi_pos = rest.find(';')
if semi_pos < 0:
dim_str = rest.strip()
real_comment = None
else:
dim_str = rest[:semi_pos].strip()
real_comment = rest[semi_pos + 1:].strip() or None
try:
dim = int(dim_str)
except ValueError:
raise ValueError(
f"Expected an integer dimension after '{VECTOR_FIELD_DIRECTIVE};', "
f"but got: '{dim_str}'."
)
if dim < 1:
raise ValueError(f"Vector dimension must be >= 1, but got: {dim}.")
option_key = CoreOptions.VECTOR_FIELD.key()
return ParsedDirective(option_key, real_comment, True, dim)
def _convert_type(directive: ParsedDirective, field_name: str, source_type: DataType) -> DataType:
if directive.is_vector:
if not isinstance(source_type, ArrayType):
raise ValueError(
f"Column {field_name} declared with a VECTOR directive "
f"must be of ARRAY type, but was {source_type}."
)
return VectorType(source_type.nullable, source_type.element, directive.vector_dim)
else:
type_name = getattr(source_type, 'type', None) if isinstance(source_type, AtomicType) else None
if type_name not in ('VARBINARY', 'BINARY', 'BYTES', 'BLOB'):
raise ValueError(
f"Column {field_name} declared with a BLOB directive "
f"must be of BYTES, BINARY or BLOB type, but was {source_type}."
)
return AtomicType('BLOB', source_type.nullable)
def _modify_field_options(option_key: str, field_name: str, options: Dict[str, str]):
existing = options.get(option_key)
if not existing:
for fk in _FALLBACK_KEYS.get(option_key, []):
fallback_value = options.pop(fk, None)
if fallback_value:
existing = fallback_value
break
if existing:
options[option_key] = existing + "," + field_name
else:
options[option_key] = field_name
def apply_add_column_directive(
comment: Optional[str],
field_name: str,
source_type: DataType,
options: Dict[str, str],
) -> Optional[ConvertedColumn]:
directive = parse_add_column_comment(comment)
if directive is None:
return None
new_type = _convert_type(directive, field_name, source_type)
_modify_field_options(directive.option_key, field_name, options)
if directive.option_key == CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key():
_modify_field_options(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), field_name, options)
return ConvertedColumn(new_type, directive.real_comment)
def apply_directives(fields: List[DataField], options: Dict[str, str]):
"""Process comment directives on fields for CREATE TABLE.
Modifies fields list and options dict in place.
Returns True if any directive was applied.
"""
changed = False
for i, field in enumerate(list(fields)):
converted = apply_add_column_directive(
field.description, field.name, field.type, options
)
if converted is not None:
changed = True
fields[i] = DataField(field.id, field.name, converted.type, converted.comment)
return changed
def _remove_from_csv_option(key: str, field_name: str, options: Dict[str, str]):
existing = options.get(key)
if not existing:
return
parts = [v.strip() for v in existing.split(",") if v.strip() and v.strip() != field_name]
if parts:
options[key] = ",".join(parts)
else:
options.pop(key, None)
def remove_dropped_directive_options(
field_name: str, type_root: str, options: Dict[str, str]
):
"""Remove directive-managed options when a BLOB or VECTOR column is dropped."""
if type_root == 'BLOB':
for opt in _BLOB_OPTIONS:
_remove_from_csv_option(opt.key(), field_name, options)
for fk in _FALLBACK_KEYS.get(opt.key(), []):
_remove_from_csv_option(fk, field_name, options)
elif type_root == 'VECTOR':
for opt in _VECTOR_OPTIONS:
_remove_from_csv_option(opt.key(), field_name, options)
options.pop(f"field.{field_name}.vector-dim", None)