blob: 7b917be4e310f5f5a2b90dc1d4a8e5d9b4de3014 [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""The ``$schemas`` system table — every committed table schema."""
import json
from typing import Any, List, Optional
import pyarrow
from pypaimon.schema.data_types import AtomicType, DataField, RowType
from pypaimon.table.system.system_table import SystemTable
TABLE_TYPE = RowType(False, [
DataField(0, "schema_id", AtomicType("BIGINT", nullable=False)),
DataField(1, "fields", AtomicType("STRING", nullable=False)),
DataField(2, "partition_keys", AtomicType("STRING", nullable=False)),
DataField(3, "primary_keys", AtomicType("STRING", nullable=False)),
DataField(4, "options", AtomicType("STRING", nullable=False)),
DataField(5, "comment", AtomicType("STRING", nullable=True)),
DataField(6, "update_time", AtomicType("TIMESTAMP(3)", nullable=False)),
])
_TIMESTAMP_TYPE = pyarrow.timestamp("ms")
def _to_json(value: Any) -> str:
"""Compact JSON encoding for the four serialised columns.
Field ordering follows the source object (insertion order for dicts
and lists is preserved by ``json.dumps``); bit-exact equality with
any external encoder is not promised — callers compare decoded
structures, not raw strings.
"""
return json.dumps(value, separators=(',', ':'), ensure_ascii=False)
class SchemasTable(SystemTable):
"""The ``$schemas`` system table."""
def system_table_name(self) -> str:
return "schemas"
def row_type(self) -> RowType:
return TABLE_TYPE
def primary_keys(self) -> List[str]:
return ["schema_id"]
def _build_arrow_table(self) -> pyarrow.Table:
schemas = self.base_table.schema_manager.list_all()
schema_ids: List[int] = []
fields_jsons: List[str] = []
partition_keys_jsons: List[str] = []
primary_keys_jsons: List[str] = []
options_jsons: List[str] = []
comments: List[Optional[str]] = []
update_times: List[int] = []
for schema in schemas:
schema_ids.append(int(schema.id))
fields_jsons.append(
_to_json([field.to_dict() for field in schema.fields]))
partition_keys_jsons.append(_to_json(list(schema.partition_keys)))
primary_keys_jsons.append(_to_json(list(schema.primary_keys)))
options_jsons.append(_to_json(dict(schema.options)))
comments.append(schema.comment)
update_times.append(int(schema.time_millis))
return pyarrow.table({
"schema_id": pyarrow.array(schema_ids, type=pyarrow.int64()),
"fields": pyarrow.array(fields_jsons, type=pyarrow.string()),
"partition_keys": pyarrow.array(
partition_keys_jsons, type=pyarrow.string()),
"primary_keys": pyarrow.array(
primary_keys_jsons, type=pyarrow.string()),
"options": pyarrow.array(options_jsons, type=pyarrow.string()),
"comment": pyarrow.array(comments, type=pyarrow.string()),
"update_time": pyarrow.array(update_times, type=_TIMESTAMP_TYPE),
})